Skip to content

Commit f57579c

Browse files
committed
feat: refactor all jobs
1 parent cbbf098 commit f57579c

File tree

54 files changed

+377
-487
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+377
-487
lines changed

packages/twenty-server/src/app.module.ts

+9-1
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ import { GraphQLConfigModule } from 'src/engine/api/graphql/graphql-config/graph
2222
import { GraphQLConfigService } from 'src/engine/api/graphql/graphql-config/graphql-config.service';
2323
import { WorkspaceCacheVersionModule } from 'src/engine/metadata-modules/workspace-cache-version/workspace-cache-version.module';
2424
import { GraphQLHydrateRequestFromTokenMiddleware } from 'src/engine/middlewares/graphql-hydrate-request-from-token.middleware';
25+
import { MessageQueueModule } from 'src/engine/integrations/message-queue/message-queue.module';
2526

26-
import { CoreEngineModule } from './engine/core-modules/core-engine.module';
2727
import { IntegrationsModule } from './engine/integrations/integrations.module';
28+
import { CoreEngineModule } from './engine/core-modules/core-engine.module';
2829

2930
@Module({
3031
imports: [
@@ -74,6 +75,13 @@ export class AppModule {
7475
);
7576
}
7677

78+
// Messaque Queue explorer only for sync driver
79+
// Maybe we don't need to conditionaly register the explorer, because we're creating a jobs module
80+
// that will expose classes that are only used in the queue worker
81+
if (process.env.MESSAGE_QUEUE_TYPE === 'sync') {
82+
modules.push(MessageQueueModule.registerExplorer());
83+
}
84+
7785
return modules;
7886
}
7987

packages/twenty-server/src/database/commands/data-seed-demo-workspace/jobs/data-seed-demo-workspace.job.ts

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
1-
import { Injectable } from '@nestjs/common';
2-
3-
import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
4-
51
import { DataSeedDemoWorkspaceService } from 'src/database/commands/data-seed-demo-workspace/services/data-seed-demo-workspace.service';
2+
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
3+
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
4+
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
65

7-
@Injectable()
8-
export class DataSeedDemoWorkspaceJob implements MessageQueueJob<undefined> {
6+
@Processor(MessageQueue.cronQueue)
7+
export class DataSeedDemoWorkspaceJob {
98
constructor(
109
private readonly dataSeedDemoWorkspaceService: DataSeedDemoWorkspaceService,
1110
) {}
1211

12+
@Process()
1313
async handle(): Promise<void> {
1414
await this.dataSeedDemoWorkspaceService.seedDemo();
1515
}

packages/twenty-server/src/engine/api/graphql/workspace-query-runner/jobs/call-webhook-jobs.job.ts

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
import { Injectable, Logger } from '@nestjs/common';
1+
import { Logger } from '@nestjs/common';
22

3-
import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
43
import { ObjectMetadataInterface } from 'src/engine/metadata-modules/field-metadata/interfaces/object-metadata.interface';
54

65
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
@@ -12,6 +11,8 @@ import {
1211
CallWebhookJobData,
1312
} from 'src/engine/api/graphql/workspace-query-runner/jobs/call-webhook.job';
1413
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
14+
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
15+
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
1516

1617
export enum CallWebhookJobsJobOperation {
1718
create = 'create',
@@ -26,10 +27,8 @@ export type CallWebhookJobsJobData = {
2627
operation: CallWebhookJobsJobOperation;
2728
};
2829

29-
@Injectable()
30-
export class CallWebhookJobsJob
31-
implements MessageQueueJob<CallWebhookJobsJobData>
32-
{
30+
@Processor(MessageQueue.webhookQueue)
31+
export class CallWebhookJobsJob {
3332
private readonly logger = new Logger(CallWebhookJobsJob.name);
3433

3534
constructor(
@@ -39,6 +38,7 @@ export class CallWebhookJobsJob
3938
private readonly messageQueueService: MessageQueueService,
4039
) {}
4140

41+
@Process()
4242
async handle(data: CallWebhookJobsJobData): Promise<void> {
4343
const dataSourceMetadata =
4444
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail(

packages/twenty-server/src/engine/api/graphql/workspace-query-runner/jobs/call-webhook.job.ts

+7-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1-
import { Injectable, Logger } from '@nestjs/common';
1+
import { Logger } from '@nestjs/common';
22
import { HttpService } from '@nestjs/axios';
33

4-
import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
4+
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
5+
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
6+
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
57

68
export type CallWebhookJobData = {
79
targetUrl: string;
@@ -13,12 +15,13 @@ export type CallWebhookJobData = {
1315
record: any;
1416
};
1517

16-
@Injectable()
17-
export class CallWebhookJob implements MessageQueueJob<CallWebhookJobData> {
18+
@Processor(MessageQueue.webhookQueue)
19+
export class CallWebhookJob {
1820
private readonly logger = new Logger(CallWebhookJob.name);
1921

2022
constructor(private readonly httpService: HttpService) {}
2123

24+
@Process()
2225
async handle(data: CallWebhookJobData): Promise<void> {
2326
try {
2427
await this.httpService.axiosRef.post(data.targetUrl, data);

packages/twenty-server/src/engine/api/graphql/workspace-query-runner/jobs/record-position-backfill.job.ts

+6-8
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,20 @@
1-
import { Injectable } from '@nestjs/common';
2-
3-
import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
4-
51
import { RecordPositionBackfillService } from 'src/engine/api/graphql/workspace-query-runner/services/record-position-backfill-service';
2+
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
3+
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
4+
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
65

76
export type RecordPositionBackfillJobData = {
87
workspaceId: string;
98
dryRun: boolean;
109
};
1110

12-
@Injectable()
13-
export class RecordPositionBackfillJob
14-
implements MessageQueueJob<RecordPositionBackfillJobData>
15-
{
11+
@Processor(MessageQueue.recordPositionBackfillQueue)
12+
export class RecordPositionBackfillJob {
1613
constructor(
1714
private readonly recordPositionBackfillService: RecordPositionBackfillService,
1815
) {}
1916

17+
@Process()
2018
async handle(data: RecordPositionBackfillJobData): Promise<void> {
2119
this.recordPositionBackfillService.backfill(data.workspaceId, data.dryRun);
2220
}

packages/twenty-server/src/engine/api/graphql/workspace-query-runner/jobs/workspace-query-runner-job.module.ts

+1-14
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,6 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works
1515
RecordPositionBackfillModule,
1616
HttpModule,
1717
],
18-
providers: [
19-
{
20-
provide: CallWebhookJobsJob.name,
21-
useClass: CallWebhookJobsJob,
22-
},
23-
{
24-
provide: CallWebhookJob.name,
25-
useClass: CallWebhookJob,
26-
},
27-
{
28-
provide: RecordPositionBackfillJob.name,
29-
useClass: RecordPositionBackfillJob,
30-
},
31-
],
18+
providers: [CallWebhookJobsJob, CallWebhookJob, RecordPositionBackfillJob],
3219
})
3320
export class WorkspaceQueryRunnerJobModule {}

packages/twenty-server/src/engine/core-modules/billing/jobs/update-subscription.job.ts

+8-7
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
1-
import { Injectable, Logger } from '@nestjs/common';
2-
3-
import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
1+
import { Logger } from '@nestjs/common';
42

53
import { BillingService } from 'src/engine/core-modules/billing/billing.service';
64
import { UserWorkspaceService } from 'src/engine/core-modules/user-workspace/user-workspace.service';
75
import { StripeService } from 'src/engine/core-modules/billing/stripe/stripe.service';
6+
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
7+
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
8+
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
89
export type UpdateSubscriptionJobData = { workspaceId: string };
9-
@Injectable()
10-
export class UpdateSubscriptionJob
11-
implements MessageQueueJob<UpdateSubscriptionJobData>
12-
{
10+
11+
@Processor(MessageQueue.billingQueue)
12+
export class UpdateSubscriptionJob {
1313
protected readonly logger = new Logger(UpdateSubscriptionJob.name);
1414

1515
constructor(
@@ -18,6 +18,7 @@ export class UpdateSubscriptionJob
1818
private readonly stripeService: StripeService,
1919
) {}
2020

21+
@Process()
2122
async handle(data: UpdateSubscriptionJobData): Promise<void> {
2223
const workspaceMembersCount =
2324
await this.userWorkspaceService.getWorkspaceMemberCount(data.workspaceId);

packages/twenty-server/src/engine/core-modules/core-engine.module.ts

-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import { TimelineCalendarEventModule } from 'src/engine/core-modules/calendar/ti
1111
import { BillingModule } from 'src/engine/core-modules/billing/billing.module';
1212
import { HealthModule } from 'src/engine/core-modules/health/health.module';
1313
import { TwentyORMModule } from 'src/engine/twenty-orm/twenty-orm.module';
14-
import { ShareContextTestModule } from 'src/engine/core-modules/share-context-test/share-context-test.module';
1514

1615
import { AnalyticsModule } from './analytics/analytics.module';
1716
import { FileModule } from './file/file.module';
@@ -35,7 +34,6 @@ import { ClientConfigModule } from './client-config/client-config.module';
3534
TimelineCalendarEventModule,
3635
UserModule,
3736
WorkspaceModule,
38-
ShareContextTestModule,
3937
],
4038
exports: [
4139
AnalyticsModule,

packages/twenty-server/src/engine/core-modules/share-context-test/share-context-test.module.ts

-15
This file was deleted.

packages/twenty-server/src/engine/core-modules/share-context-test/shared.service.ts

-17
This file was deleted.

packages/twenty-server/src/engine/core-modules/share-context-test/test.job.ts

-38
This file was deleted.

packages/twenty-server/src/engine/core-modules/share-context-test/test.listener.ts

-34
This file was deleted.

packages/twenty-server/src/engine/core-modules/share-context-test/test.resolver.ts

-13
This file was deleted.

packages/twenty-server/src/engine/core-modules/workspace/handle-workspace-member-deleted.job.ts

+7-8
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,18 @@
1-
import { Injectable } from '@nestjs/common';
2-
3-
import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
4-
51
import { WorkspaceService } from 'src/engine/core-modules/workspace/services/workspace.service';
2+
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
3+
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
4+
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
65

76
export type HandleWorkspaceMemberDeletedJobData = {
87
workspaceId: string;
98
userId: string;
109
};
11-
@Injectable()
12-
export class HandleWorkspaceMemberDeletedJob
13-
implements MessageQueueJob<HandleWorkspaceMemberDeletedJobData>
14-
{
10+
11+
@Processor(MessageQueue.workspaceQueue)
12+
export class HandleWorkspaceMemberDeletedJob {
1513
constructor(private readonly workspaceService: WorkspaceService) {}
1614

15+
@Process()
1716
async handle(data: HandleWorkspaceMemberDeletedJobData): Promise<void> {
1817
const { workspaceId, userId } = data;
1918

packages/twenty-server/src/engine/integrations/email/email-sender.job.ts

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
1-
import { Injectable } from '@nestjs/common';
2-
31
import { SendMailOptions } from 'nodemailer';
42

5-
import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
6-
73
import { EmailSenderService } from 'src/engine/integrations/email/email-sender.service';
4+
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
5+
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
6+
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
87

9-
@Injectable()
10-
export class EmailSenderJob implements MessageQueueJob<SendMailOptions> {
8+
@Processor(MessageQueue.emailQueue)
9+
export class EmailSenderJob {
1110
constructor(private readonly emailSenderService: EmailSenderService) {}
1211

12+
@Process()
1313
async handle(data: SendMailOptions): Promise<void> {
1414
await this.emailSenderService.send(data);
1515
}

packages/twenty-server/src/engine/integrations/integrations.module.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import { MessageQueueModule } from './message-queue/message-queue.module';
3030
useFactory: loggerModuleFactory,
3131
inject: [EnvironmentService],
3232
}),
33-
MessageQueueModule.forRoot({
33+
MessageQueueModule.registerAsync({
3434
useFactory: messageQueueModuleFactory,
3535
inject: [EnvironmentService],
3636
}),

0 commit comments

Comments
 (0)