Skip to content

Commit

Permalink
feat: Enhancements to MessageQueue Module with Decorators (twentyhq#5657
Browse files Browse the repository at this point in the history
)

### Overview

This PR introduces significant enhancements to the MessageQueue module
by integrating `@Processor`, `@Process`, and `@InjectMessageQueue`
decorators. These changes streamline the process of defining and
managing queue processors and job handlers, and also allow for
request-scoped handlers, improving compatibility with services that rely
on scoped providers like TwentyORM repositories.

### Key Features

1. **Decorator-based Job Handling**: Use `@Processor` and `@Process`
decorators to define job handlers declaratively.
2. **Request Scope Support**: Job handlers can be scoped per request,
enhancing integration with request-scoped services.

### Usage

#### Defining Processors and Job Handlers

The `@Processor` decorator is used to define a class that processes jobs
for a specific queue. The `@Process` decorator is applied to methods
within this class to define specific job handlers.

##### Example 1: Specific Job Handlers

```typescript
import { Processor, Process, InjectMessageQueue } from 'src/engine/integrations/message-queue';

@processor('taskQueue')
export class TaskProcessor {

  @process('taskA')
  async handleTaskA(job: { id: string, data: any }) {
    console.log(`Handling task A with data:`, job.data);
    // Logic for task A
  }

  @process('taskB')
  async handleTaskB(job: { id: string, data: any }) {
    console.log(`Handling task B with data:`, job.data);
    // Logic for task B
  }
}
```

In the example above, `TaskProcessor` is responsible for processing jobs
in the `taskQueue`. The `handleTaskA` method will only be called for
jobs with the name `taskA`, while `handleTaskB` will be called for
`taskB` jobs.

##### Example 2: General Job Handler

```typescript
import { Processor, Process, InjectMessageQueue } from 'src/engine/integrations/message-queue';

@processor('generalQueue')
export class GeneralProcessor {

  @process()
  async handleAnyJob(job: { id: string, name: string, data: any }) {
    console.log(`Handling job ${job.name} with data:`, job.data);
    // Logic for any job
  }
}
```

In this example, `GeneralProcessor` handles all jobs in the
`generalQueue`, regardless of the job name. The `handleAnyJob` method
will be invoked for every job added to the `generalQueue`.

#### Adding Jobs to a Queue

You can use the `@InjectMessageQueue` decorator to inject a queue into a
service and add jobs to it.

##### Example:

```typescript
import { Injectable } from '@nestjs/common';
import { InjectMessageQueue, MessageQueue } from 'src/engine/integrations/message-queue';

@Injectable()
export class TaskService {
  constructor(
    @InjectMessageQueue('taskQueue') private readonly taskQueue: MessageQueue,
  ) {}

  async addTaskA(data: any) {
    await this.taskQueue.add('taskA', data);
  }

  async addTaskB(data: any) {
    await this.taskQueue.add('taskB', data);
  }
}
```

In this example, `TaskService` adds jobs to the `taskQueue`. The
`addTaskA` and `addTaskB` methods add jobs named `taskA` and `taskB`,
respectively, to the queue.

#### Using Scoped Job Handlers

To utilize request-scoped job handlers, specify the scope in the
`@Processor` decorator. This is particularly useful for services that
use scoped repositories like those in TwentyORM.

##### Example:

```typescript
import { Processor, Process, InjectMessageQueue, Scope } from 'src/engine/integrations/message-queue';

@processor({ name: 'scopedQueue', scope: Scope.REQUEST })
export class ScopedTaskProcessor {

  @process('scopedTask')
  async handleScopedTask(job: { id: string, data: any }) {
    console.log(`Handling scoped task with data:`, job.data);
    // Logic for scoped task, which might use request-scoped services
  }
}
```

Here, the `ScopedTaskProcessor` is associated with `scopedQueue` and
operates with request scope. This setup is essential when the job
handler relies on services that need to be instantiated per request,
such as scoped repositories.

### Migration Notes

- **Decorators**: Refactor job handlers to use `@Processor` and
`@Process` decorators.
- **Request Scope**: Utilize the scope option in `@Processor` if your
job handlers depend on request-scoped services.

Fix twentyhq#5628

---------

Co-authored-by: Weiko <[email protected]>
  • Loading branch information
magrinj and Weiko authored Jun 17, 2024
1 parent 341eafb commit de26129
Show file tree
Hide file tree
Showing 92 changed files with 952 additions and 521 deletions.
1 change: 1 addition & 0 deletions packages/twenty-server/@types/express.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ declare module 'express-serve-static-core' {
interface Request {
user?: User;
workspace?: Workspace;
workspaceId?: string;
cacheVersion?: string | null;
}
}
2 changes: 2 additions & 0 deletions packages/twenty-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"jsdom": "~22.1.0",
"jwt-decode": "^4.0.0",
"lodash.differencewith": "^4.5.0",
"lodash.omitby": "^4.6.0",
"lodash.uniq": "^4.5.0",
"lodash.uniqby": "^4.7.0",
"passport": "^0.7.0",
Expand All @@ -40,6 +41,7 @@
"@types/lodash.isequal": "^4.5.8",
"@types/lodash.isobject": "^3.0.7",
"@types/lodash.omit": "^4.5.9",
"@types/lodash.omitby": "^4.6.9",
"@types/lodash.snakecase": "^4.1.7",
"@types/lodash.uniq": "^4.5.9",
"@types/lodash.uniqby": "^4.7.9",
Expand Down
11 changes: 10 additions & 1 deletion packages/twenty-server/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import { GraphQLConfigModule } from 'src/engine/api/graphql/graphql-config/graph
import { GraphQLConfigService } from 'src/engine/api/graphql/graphql-config/graphql-config.service';
import { WorkspaceCacheVersionModule } from 'src/engine/metadata-modules/workspace-cache-version/workspace-cache-version.module';
import { GraphQLHydrateRequestFromTokenMiddleware } from 'src/engine/middlewares/graphql-hydrate-request-from-token.middleware';
import { MessageQueueModule } from 'src/engine/integrations/message-queue/message-queue.module';
import { MessageQueueDriverType } from 'src/engine/integrations/message-queue/interfaces';

import { CoreEngineModule } from './engine/core-modules/core-engine.module';
import { IntegrationsModule } from './engine/integrations/integrations.module';
import { CoreEngineModule } from './engine/core-modules/core-engine.module';

@Module({
imports: [
Expand Down Expand Up @@ -72,6 +74,13 @@ export class AppModule {
);
}

// Messaque Queue explorer only for sync driver
// Maybe we don't need to conditionaly register the explorer, because we're creating a jobs module
// that will expose classes that are only used in the queue worker
if (process.env.MESSAGE_QUEUE_TYPE === MessageQueueDriverType.Sync) {
modules.push(MessageQueueModule.registerExplorer());
}

return modules;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { Inject } from '@nestjs/common';

import { Command, CommandRunner } from 'nest-commander';

import { dataSeedDemoWorkspaceCronPattern } from 'src/database/commands/data-seed-demo-workspace/crons/data-seed-demo-workspace-cron-pattern';
import { DataSeedDemoWorkspaceJob } from 'src/database/commands/data-seed-demo-workspace/jobs/data-seed-demo-workspace.job';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';

Expand All @@ -13,7 +12,7 @@ import { MessageQueueService } from 'src/engine/integrations/message-queue/servi
})
export class StartDataSeedDemoWorkspaceCronCommand extends CommandRunner {
constructor(
@Inject(MessageQueue.cronQueue)
@InjectMessageQueue(MessageQueue.cronQueue)
private readonly messageQueueService: MessageQueueService,
) {
super();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { Inject } from '@nestjs/common';

import { Command, CommandRunner } from 'nest-commander';

import { dataSeedDemoWorkspaceCronPattern } from 'src/database/commands/data-seed-demo-workspace/crons/data-seed-demo-workspace-cron-pattern';
import { DataSeedDemoWorkspaceJob } from 'src/database/commands/data-seed-demo-workspace/jobs/data-seed-demo-workspace.job';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';

Expand All @@ -13,7 +12,7 @@ import { MessageQueueService } from 'src/engine/integrations/message-queue/servi
})
export class StopDataSeedDemoWorkspaceCronCommand extends CommandRunner {
constructor(
@Inject(MessageQueue.cronQueue)
@InjectMessageQueue(MessageQueue.cronQueue)
private readonly messageQueueService: MessageQueueService,
) {
super();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import { Injectable } from '@nestjs/common';

import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';

import { DataSeedDemoWorkspaceService } from 'src/database/commands/data-seed-demo-workspace/services/data-seed-demo-workspace.service';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';

@Injectable()
export class DataSeedDemoWorkspaceJob implements MessageQueueJob<undefined> {
@Processor(MessageQueue.cronQueue)
export class DataSeedDemoWorkspaceJob {
constructor(
private readonly dataSeedDemoWorkspaceService: DataSeedDemoWorkspaceService,
) {}

@Process(DataSeedDemoWorkspaceJob.name)
async handle(): Promise<void> {
await this.dataSeedDemoWorkspaceService.seedDemo();
}
Expand Down
4 changes: 4 additions & 0 deletions packages/twenty-server/src/database/typeorm/typeorm.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { TypeOrmModule, TypeOrmModuleOptions } from '@nestjs/typeorm';

import { typeORMCoreModuleOptions } from 'src/database/typeorm/core/core.datasource';
import { EnvironmentModule } from 'src/engine/integrations/environment/environment.module';
import { TwentyORMModule } from 'src/engine/twenty-orm/twenty-orm.module';

import { TypeORMService } from './typeorm.service';

Expand All @@ -28,6 +29,9 @@ const coreTypeORMFactory = async (): Promise<TypeOrmModuleOptions> => ({
useFactory: coreTypeORMFactory,
name: 'core',
}),
TwentyORMModule.register({
workspaceEntities: ['dist/src/**/*.workspace-entity{.ts,.js}'],
}),
EnvironmentModule,
],
providers: [TypeORMService],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { Inject } from '@nestjs/common';

import { Command, CommandRunner, Option } from 'nest-commander';

import {
RecordPositionBackfillJob,
RecordPositionBackfillJobData,
} from 'src/engine/api/graphql/workspace-query-runner/jobs/record-position-backfill.job';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';

Expand All @@ -20,7 +19,7 @@ export type RecordPositionBackfillCommandOptions = {
})
export class RecordPositionBackfillCommand extends CommandRunner {
constructor(
@Inject(MessageQueue.recordPositionBackfillQueue)
@InjectMessageQueue(MessageQueue.recordPositionBackfillQueue)
private readonly messageQueueService: MessageQueueService,
) {
super();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { Inject, Injectable, Logger } from '@nestjs/common';
import { Logger } from '@nestjs/common';

import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
import { ObjectMetadataInterface } from 'src/engine/metadata-modules/field-metadata/interfaces/object-metadata.interface';

import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
Expand All @@ -11,6 +10,9 @@ import {
CallWebhookJob,
CallWebhookJobData,
} from 'src/engine/api/graphql/workspace-query-runner/jobs/call-webhook.job';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';

export enum CallWebhookJobsJobOperation {
create = 'create',
Expand All @@ -25,19 +27,18 @@ export type CallWebhookJobsJobData = {
operation: CallWebhookJobsJobOperation;
};

@Injectable()
export class CallWebhookJobsJob
implements MessageQueueJob<CallWebhookJobsJobData>
{
@Processor(MessageQueue.webhookQueue)
export class CallWebhookJobsJob {
private readonly logger = new Logger(CallWebhookJobsJob.name);

constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
private readonly dataSourceService: DataSourceService,
@Inject(MessageQueue.webhookQueue)
@InjectMessageQueue(MessageQueue.webhookQueue)
private readonly messageQueueService: MessageQueueService,
) {}

@Process(CallWebhookJobsJob.name)
async handle(data: CallWebhookJobsJobData): Promise<void> {
const dataSourceMetadata =
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { Injectable, Logger } from '@nestjs/common';
import { Logger } from '@nestjs/common';
import { HttpService } from '@nestjs/axios';

import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';

export type CallWebhookJobData = {
targetUrl: string;
Expand All @@ -13,12 +15,13 @@ export type CallWebhookJobData = {
record: any;
};

@Injectable()
export class CallWebhookJob implements MessageQueueJob<CallWebhookJobData> {
@Processor(MessageQueue.webhookQueue)
export class CallWebhookJob {
private readonly logger = new Logger(CallWebhookJob.name);

constructor(private readonly httpService: HttpService) {}

@Process(CallWebhookJob.name)
async handle(data: CallWebhookJobData): Promise<void> {
try {
await this.httpService.axiosRef.post(data.targetUrl, data);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
import { Injectable } from '@nestjs/common';

import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';

import { RecordPositionBackfillService } from 'src/engine/api/graphql/workspace-query-runner/services/record-position-backfill-service';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';

export type RecordPositionBackfillJobData = {
workspaceId: string;
dryRun: boolean;
};

@Injectable()
export class RecordPositionBackfillJob
implements MessageQueueJob<RecordPositionBackfillJobData>
{
@Processor(MessageQueue.recordPositionBackfillQueue)
export class RecordPositionBackfillJob {
constructor(
private readonly recordPositionBackfillService: RecordPositionBackfillService,
) {}

@Process(RecordPositionBackfillJob.name)
async handle(data: RecordPositionBackfillJobData): Promise<void> {
this.recordPositionBackfillService.backfill(data.workspaceId, data.dryRun);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,6 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works
RecordPositionBackfillModule,
HttpModule,
],
providers: [
{
provide: CallWebhookJobsJob.name,
useClass: CallWebhookJobsJob,
},
{
provide: CallWebhookJob.name,
useClass: CallWebhookJob,
},
{
provide: RecordPositionBackfillJob.name,
useClass: RecordPositionBackfillJob,
},
],
providers: [CallWebhookJobsJob, CallWebhookJob, RecordPositionBackfillJob],
})
export class WorkspaceQueryRunnerJobModule {}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Inject, Injectable } from '@nestjs/common';
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';

import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
Expand All @@ -8,12 +8,13 @@ import { objectRecordChangedValues } from 'src/engine/integrations/event-emitter
import { ObjectRecordUpdateEvent } from 'src/engine/integrations/event-emitter/types/object-record-update.event';
import { ObjectRecordBaseEvent } from 'src/engine/integrations/event-emitter/types/object-record.base.event';
import { UpsertTimelineActivityFromInternalEvent } from 'src/modules/timeline/jobs/upsert-timeline-activity-from-internal-event.job';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { CreateAuditLogFromInternalEvent } from 'src/modules/timeline/jobs/create-audit-log-from-internal-event';

@Injectable()
export class EntityEventsToDbListener {
constructor(
@Inject(MessageQueue.entityEventsToDbQueue)
@InjectMessageQueue(MessageQueue.entityEventsToDbQueue)
private readonly messageQueueService: MessageQueueService,
) {}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {
BadRequestException,
Inject,
Injectable,
Logger,
RequestTimeoutException,
Expand Down Expand Up @@ -52,6 +51,7 @@ import { assertMutationNotOnRemoteObject } from 'src/engine/metadata-modules/obj
import { STANDARD_OBJECT_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids';
import { assertIsValidUuid } from 'src/engine/api/graphql/workspace-query-runner/utils/assert-is-valid-uuid.util';
import { isQueryTimeoutError } from 'src/engine/utils/query-timeout.util';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';

import { WorkspaceQueryRunnerOptions } from './interfaces/query-runner-option.interface';
import {
Expand All @@ -72,7 +72,7 @@ export class WorkspaceQueryRunnerService {
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
private readonly queryRunnerArgsFactory: QueryRunnerArgsFactory,
private readonly queryResultGettersFactory: QueryResultGettersFactory,
@Inject(MessageQueue.webhookQueue)
@InjectMessageQueue(MessageQueue.webhookQueue)
private readonly messageQueueService: MessageQueueService,
private readonly eventEmitter: EventEmitter2,
private readonly workspacePreQueryHookService: WorkspacePreQueryHookService,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Injectable, Inject } from '@nestjs/common';
import { Injectable } from '@nestjs/common';

import { EntityManager } from 'typeorm';
import { v4 } from 'uuid';
Expand Down Expand Up @@ -34,15 +34,16 @@ import {
MessagingMessageListFetchJob,
MessagingMessageListFetchJobData,
} from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';

@Injectable()
export class GoogleAPIsService {
constructor(
private readonly dataSourceService: DataSourceService,
private readonly typeORMService: TypeORMService,
@Inject(MessageQueue.messagingQueue)
@InjectMessageQueue(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
@Inject(MessageQueue.calendarQueue)
@InjectMessageQueue(MessageQueue.calendarQueue)
private readonly calendarQueueService: MessageQueueService,
private readonly environmentService: EnvironmentService,
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
import { Injectable, Logger } from '@nestjs/common';

import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
import { Logger } from '@nestjs/common';

import { BillingService } from 'src/engine/core-modules/billing/billing.service';
import { UserWorkspaceService } from 'src/engine/core-modules/user-workspace/user-workspace.service';
import { StripeService } from 'src/engine/core-modules/billing/stripe/stripe.service';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
export type UpdateSubscriptionJobData = { workspaceId: string };
@Injectable()
export class UpdateSubscriptionJob
implements MessageQueueJob<UpdateSubscriptionJobData>
{

@Processor(MessageQueue.billingQueue)
export class UpdateSubscriptionJob {
protected readonly logger = new Logger(UpdateSubscriptionJob.name);

constructor(
private readonly billingService: BillingService,
private readonly userWorkspaceService: UserWorkspaceService,
private readonly stripeService: StripeService,
) {}

@Process(UpdateSubscriptionJob.name)
async handle(data: UpdateSubscriptionJobData): Promise<void> {
const workspaceMembersCount =
await this.userWorkspaceService.getWorkspaceMemberCount(data.workspaceId);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Inject, Injectable } from '@nestjs/common';
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';

import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
Expand All @@ -10,11 +10,12 @@ import {
UpdateSubscriptionJobData,
} from 'src/engine/core-modules/billing/jobs/update-subscription.job';
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';

@Injectable()
export class BillingWorkspaceMemberListener {
constructor(
@Inject(MessageQueue.billingQueue)
@InjectMessageQueue(MessageQueue.billingQueue)
private readonly messageQueueService: MessageQueueService,
private readonly environmentService: EnvironmentService,
) {}
Expand Down
Loading

0 comments on commit de26129

Please sign in to comment.