Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Enhancements to MessageQueue Module with Decorators #5657

Merged
merged 18 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/twenty-server/@types/express.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ declare module 'express-serve-static-core' {
interface Request {
user?: User;
workspace?: Workspace;
workspaceId?: string;
cacheVersion?: string | null;
}
}
2 changes: 2 additions & 0 deletions packages/twenty-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"jsdom": "~22.1.0",
"jwt-decode": "^4.0.0",
"lodash.differencewith": "^4.5.0",
"lodash.omitby": "^4.6.0",
"lodash.uniq": "^4.5.0",
"lodash.uniqby": "^4.7.0",
"passport": "^0.7.0",
Expand All @@ -40,6 +41,7 @@
"@types/lodash.isequal": "^4.5.8",
"@types/lodash.isobject": "^3.0.7",
"@types/lodash.omit": "^4.5.9",
"@types/lodash.omitby": "^4.6.9",
"@types/lodash.snakecase": "^4.1.7",
"@types/lodash.uniq": "^4.5.9",
"@types/lodash.uniqby": "^4.7.9",
Expand Down
11 changes: 10 additions & 1 deletion packages/twenty-server/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import { GraphQLConfigModule } from 'src/engine/api/graphql/graphql-config/graph
import { GraphQLConfigService } from 'src/engine/api/graphql/graphql-config/graphql-config.service';
import { WorkspaceCacheVersionModule } from 'src/engine/metadata-modules/workspace-cache-version/workspace-cache-version.module';
import { GraphQLHydrateRequestFromTokenMiddleware } from 'src/engine/middlewares/graphql-hydrate-request-from-token.middleware';
import { MessageQueueModule } from 'src/engine/integrations/message-queue/message-queue.module';
import { MessageQueueDriverType } from 'src/engine/integrations/message-queue/interfaces';

import { CoreEngineModule } from './engine/core-modules/core-engine.module';
import { IntegrationsModule } from './engine/integrations/integrations.module';
import { CoreEngineModule } from './engine/core-modules/core-engine.module';

@Module({
imports: [
Expand Down Expand Up @@ -72,6 +74,13 @@ export class AppModule {
);
}

// Messaque Queue explorer only for sync driver
// Maybe we don't need to conditionaly register the explorer, because we're creating a jobs module
// that will expose classes that are only used in the queue worker
magrinj marked this conversation as resolved.
Show resolved Hide resolved
if (process.env.MESSAGE_QUEUE_TYPE === MessageQueueDriverType.Sync) {
modules.push(MessageQueueModule.registerExplorer());
}

return modules;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { Inject } from '@nestjs/common';

import { Command, CommandRunner } from 'nest-commander';

import { dataSeedDemoWorkspaceCronPattern } from 'src/database/commands/data-seed-demo-workspace/crons/data-seed-demo-workspace-cron-pattern';
import { DataSeedDemoWorkspaceJob } from 'src/database/commands/data-seed-demo-workspace/jobs/data-seed-demo-workspace.job';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';

Expand All @@ -13,7 +12,7 @@ import { MessageQueueService } from 'src/engine/integrations/message-queue/servi
})
export class StartDataSeedDemoWorkspaceCronCommand extends CommandRunner {
constructor(
@Inject(MessageQueue.cronQueue)
@InjectMessageQueue(MessageQueue.cronQueue)
private readonly messageQueueService: MessageQueueService,
) {
super();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { Inject } from '@nestjs/common';

import { Command, CommandRunner } from 'nest-commander';

import { dataSeedDemoWorkspaceCronPattern } from 'src/database/commands/data-seed-demo-workspace/crons/data-seed-demo-workspace-cron-pattern';
import { DataSeedDemoWorkspaceJob } from 'src/database/commands/data-seed-demo-workspace/jobs/data-seed-demo-workspace.job';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';

Expand All @@ -13,7 +12,7 @@ import { MessageQueueService } from 'src/engine/integrations/message-queue/servi
})
export class StopDataSeedDemoWorkspaceCronCommand extends CommandRunner {
constructor(
@Inject(MessageQueue.cronQueue)
@InjectMessageQueue(MessageQueue.cronQueue)
private readonly messageQueueService: MessageQueueService,
) {
super();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import { Injectable } from '@nestjs/common';

import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';

import { DataSeedDemoWorkspaceService } from 'src/database/commands/data-seed-demo-workspace/services/data-seed-demo-workspace.service';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';

@Injectable()
export class DataSeedDemoWorkspaceJob implements MessageQueueJob<undefined> {
@Processor(MessageQueue.cronQueue)
export class DataSeedDemoWorkspaceJob {
constructor(
private readonly dataSeedDemoWorkspaceService: DataSeedDemoWorkspaceService,
) {}

@Process(DataSeedDemoWorkspaceJob.name)
async handle(): Promise<void> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For future improvement, maybe @process() could automatically use the class name directly if not overwritten? I feel like most of the time we will have 1 class per job and always use the handle method

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have to think a bit about this, I'm not 100% sure it's the good way to go

await this.dataSeedDemoWorkspaceService.seedDemo();
}
Expand Down
4 changes: 4 additions & 0 deletions packages/twenty-server/src/database/typeorm/typeorm.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { TypeOrmModule, TypeOrmModuleOptions } from '@nestjs/typeorm';

import { typeORMCoreModuleOptions } from 'src/database/typeorm/core/core.datasource';
import { EnvironmentModule } from 'src/engine/integrations/environment/environment.module';
import { TwentyORMModule } from 'src/engine/twenty-orm/twenty-orm.module';

import { TypeORMService } from './typeorm.service';

Expand All @@ -28,6 +29,9 @@ const coreTypeORMFactory = async (): Promise<TypeOrmModuleOptions> => ({
useFactory: coreTypeORMFactory,
name: 'core',
}),
TwentyORMModule.register({
workspaceEntities: ['dist/src/**/*.workspace-entity{.ts,.js}'],
}),
EnvironmentModule,
],
providers: [TypeORMService],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { Inject } from '@nestjs/common';

import { Command, CommandRunner, Option } from 'nest-commander';

import {
RecordPositionBackfillJob,
RecordPositionBackfillJobData,
} from 'src/engine/api/graphql/workspace-query-runner/jobs/record-position-backfill.job';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';

Expand All @@ -20,7 +19,7 @@ export type RecordPositionBackfillCommandOptions = {
})
export class RecordPositionBackfillCommand extends CommandRunner {
constructor(
@Inject(MessageQueue.recordPositionBackfillQueue)
@InjectMessageQueue(MessageQueue.recordPositionBackfillQueue)
private readonly messageQueueService: MessageQueueService,
) {
super();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { Inject, Injectable, Logger } from '@nestjs/common';
import { Logger } from '@nestjs/common';

import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
import { ObjectMetadataInterface } from 'src/engine/metadata-modules/field-metadata/interfaces/object-metadata.interface';

import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
Expand All @@ -11,6 +10,9 @@ import {
CallWebhookJob,
CallWebhookJobData,
} from 'src/engine/api/graphql/workspace-query-runner/jobs/call-webhook.job';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';

export enum CallWebhookJobsJobOperation {
create = 'create',
Expand All @@ -25,19 +27,18 @@ export type CallWebhookJobsJobData = {
operation: CallWebhookJobsJobOperation;
};

@Injectable()
export class CallWebhookJobsJob
implements MessageQueueJob<CallWebhookJobsJobData>
{
@Processor(MessageQueue.webhookQueue)
export class CallWebhookJobsJob {
private readonly logger = new Logger(CallWebhookJobsJob.name);

constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
private readonly dataSourceService: DataSourceService,
@Inject(MessageQueue.webhookQueue)
@InjectMessageQueue(MessageQueue.webhookQueue)
private readonly messageQueueService: MessageQueueService,
) {}

@Process(CallWebhookJobsJob.name)
async handle(data: CallWebhookJobsJobData): Promise<void> {
const dataSourceMetadata =
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { Injectable, Logger } from '@nestjs/common';
import { Logger } from '@nestjs/common';
import { HttpService } from '@nestjs/axios';

import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';

export type CallWebhookJobData = {
targetUrl: string;
Expand All @@ -13,12 +15,13 @@ export type CallWebhookJobData = {
record: any;
};

@Injectable()
export class CallWebhookJob implements MessageQueueJob<CallWebhookJobData> {
@Processor(MessageQueue.webhookQueue)
export class CallWebhookJob {
private readonly logger = new Logger(CallWebhookJob.name);

constructor(private readonly httpService: HttpService) {}

@Process(CallWebhookJob.name)
async handle(data: CallWebhookJobData): Promise<void> {
try {
await this.httpService.axiosRef.post(data.targetUrl, data);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
import { Injectable } from '@nestjs/common';

import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';

import { RecordPositionBackfillService } from 'src/engine/api/graphql/workspace-query-runner/services/record-position-backfill-service';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';

export type RecordPositionBackfillJobData = {
workspaceId: string;
dryRun: boolean;
};

@Injectable()
export class RecordPositionBackfillJob
implements MessageQueueJob<RecordPositionBackfillJobData>
{
@Processor(MessageQueue.recordPositionBackfillQueue)
export class RecordPositionBackfillJob {
constructor(
private readonly recordPositionBackfillService: RecordPositionBackfillService,
) {}

@Process(RecordPositionBackfillJob.name)
async handle(data: RecordPositionBackfillJobData): Promise<void> {
this.recordPositionBackfillService.backfill(data.workspaceId, data.dryRun);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,6 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works
RecordPositionBackfillModule,
HttpModule,
],
providers: [
{
provide: CallWebhookJobsJob.name,
useClass: CallWebhookJobsJob,
},
{
provide: CallWebhookJob.name,
useClass: CallWebhookJob,
},
{
provide: RecordPositionBackfillJob.name,
useClass: RecordPositionBackfillJob,
},
],
providers: [CallWebhookJobsJob, CallWebhookJob, RecordPositionBackfillJob],
})
export class WorkspaceQueryRunnerJobModule {}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Inject, Injectable } from '@nestjs/common';
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';

import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
Expand All @@ -8,12 +8,13 @@ import { objectRecordChangedValues } from 'src/engine/integrations/event-emitter
import { ObjectRecordUpdateEvent } from 'src/engine/integrations/event-emitter/types/object-record-update.event';
import { ObjectRecordBaseEvent } from 'src/engine/integrations/event-emitter/types/object-record.base.event';
import { UpsertTimelineActivityFromInternalEvent } from 'src/modules/timeline/jobs/upsert-timeline-activity-from-internal-event.job';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { CreateAuditLogFromInternalEvent } from 'src/modules/timeline/jobs/create-audit-log-from-internal-event';

@Injectable()
export class EntityEventsToDbListener {
constructor(
@Inject(MessageQueue.entityEventsToDbQueue)
@InjectMessageQueue(MessageQueue.entityEventsToDbQueue)
private readonly messageQueueService: MessageQueueService,
) {}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {
BadRequestException,
Inject,
Injectable,
Logger,
RequestTimeoutException,
Expand Down Expand Up @@ -52,6 +51,7 @@ import { assertMutationNotOnRemoteObject } from 'src/engine/metadata-modules/obj
import { STANDARD_OBJECT_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids';
import { assertIsValidUuid } from 'src/engine/api/graphql/workspace-query-runner/utils/assert-is-valid-uuid.util';
import { isQueryTimeoutError } from 'src/engine/utils/query-timeout.util';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';

import { WorkspaceQueryRunnerOptions } from './interfaces/query-runner-option.interface';
import {
Expand All @@ -72,7 +72,7 @@ export class WorkspaceQueryRunnerService {
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
private readonly queryRunnerArgsFactory: QueryRunnerArgsFactory,
private readonly queryResultGettersFactory: QueryResultGettersFactory,
@Inject(MessageQueue.webhookQueue)
@InjectMessageQueue(MessageQueue.webhookQueue)
private readonly messageQueueService: MessageQueueService,
private readonly eventEmitter: EventEmitter2,
private readonly workspacePreQueryHookService: WorkspacePreQueryHookService,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Injectable, Inject } from '@nestjs/common';
import { Injectable } from '@nestjs/common';

import { EntityManager } from 'typeorm';
import { v4 } from 'uuid';
Expand Down Expand Up @@ -34,15 +34,16 @@ import {
MessagingMessageListFetchJob,
MessagingMessageListFetchJobData,
} from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';

@Injectable()
export class GoogleAPIsService {
constructor(
private readonly dataSourceService: DataSourceService,
private readonly typeORMService: TypeORMService,
@Inject(MessageQueue.messagingQueue)
@InjectMessageQueue(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
@Inject(MessageQueue.calendarQueue)
@InjectMessageQueue(MessageQueue.calendarQueue)
private readonly calendarQueueService: MessageQueueService,
private readonly environmentService: EnvironmentService,
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
import { Injectable, Logger } from '@nestjs/common';

import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
import { Logger } from '@nestjs/common';

magrinj marked this conversation as resolved.
Show resolved Hide resolved
import { BillingService } from 'src/engine/core-modules/billing/billing.service';
import { UserWorkspaceService } from 'src/engine/core-modules/user-workspace/user-workspace.service';
import { StripeService } from 'src/engine/core-modules/billing/stripe/stripe.service';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
export type UpdateSubscriptionJobData = { workspaceId: string };
@Injectable()
export class UpdateSubscriptionJob
implements MessageQueueJob<UpdateSubscriptionJobData>
{

@Processor(MessageQueue.billingQueue)
export class UpdateSubscriptionJob {
protected readonly logger = new Logger(UpdateSubscriptionJob.name);

constructor(
private readonly billingService: BillingService,
private readonly userWorkspaceService: UserWorkspaceService,
private readonly stripeService: StripeService,
) {}

@Process(UpdateSubscriptionJob.name)
async handle(data: UpdateSubscriptionJobData): Promise<void> {
const workspaceMembersCount =
await this.userWorkspaceService.getWorkspaceMemberCount(data.workspaceId);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Inject, Injectable } from '@nestjs/common';
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';

import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
Expand All @@ -10,11 +10,12 @@ import {
UpdateSubscriptionJobData,
} from 'src/engine/core-modules/billing/jobs/update-subscription.job';
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';

@Injectable()
export class BillingWorkspaceMemberListener {
constructor(
@Inject(MessageQueue.billingQueue)
@InjectMessageQueue(MessageQueue.billingQueue)
private readonly messageQueueService: MessageQueueService,
private readonly environmentService: EnvironmentService,
) {}
Expand Down
Loading
Loading