Skip to content

Commit 8c8f192

Browse files
authored
Trigger workflow on database event (#6480)
- Add global listener on database event - Fetch event listener associated - Trigger associated workflow Also updated the runner so it expects the input to be in the payload rather than the trigger
1 parent ae423f5 commit 8c8f192

15 files changed

+271
-39
lines changed

packages/twenty-server/src/engine/core-modules/core-engine.module.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import { TimelineMessagingModule } from 'src/engine/core-modules/messaging/timel
1111
import { OpenApiModule } from 'src/engine/core-modules/open-api/open-api.module';
1212
import { PostgresCredentialsModule } from 'src/engine/core-modules/postgres-credentials/postgres-credentials.module';
1313
import { UserModule } from 'src/engine/core-modules/user/user.module';
14-
import { WorkflowTriggerModule } from 'src/engine/core-modules/workflow/workflow-trigger.module';
14+
import { WorkflowTriggerCoreModule } from 'src/engine/core-modules/workflow/core-workflow-trigger.module';
1515
import { WorkspaceModule } from 'src/engine/core-modules/workspace/workspace.module';
1616

1717
import { AnalyticsModule } from './analytics/analytics.module';
@@ -35,7 +35,7 @@ import { FileModule } from './file/file.module';
3535
WorkspaceModule,
3636
AISQLQueryModule,
3737
PostgresCredentialsModule,
38-
WorkflowTriggerModule,
38+
WorkflowTriggerCoreModule,
3939
],
4040
exports: [
4141
AnalyticsModule,

packages/twenty-server/src/engine/core-modules/workflow/workflow-trigger.module.ts renamed to packages/twenty-server/src/engine/core-modules/workflow/core-workflow-trigger.module.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ import { Module } from '@nestjs/common';
22

33
import { WorkflowTriggerResolver } from 'src/engine/core-modules/workflow/workflow-trigger.resolver';
44
import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module';
5-
import { WorkflowTriggerService } from 'src/modules/workflow/workflow-trigger/workflow-trigger.service';
65
import { WorkflowRunnerModule } from 'src/modules/workflow/workflow-runner/workflow-runner.module';
6+
import { WorkflowTriggerService } from 'src/modules/workflow/workflow-trigger/workflow-trigger.service';
77

88
@Module({
99
imports: [WorkflowCommonModule, WorkflowRunnerModule],
1010
providers: [WorkflowTriggerService, WorkflowTriggerResolver],
1111
})
12-
export class WorkflowTriggerModule {}
12+
export class WorkflowTriggerCoreModule {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import { Field, InputType } from '@nestjs/graphql';
2+
3+
import graphqlTypeJson from 'graphql-type-json';
4+
5+
@InputType()
6+
export class RunWorkflowVersionInput {
7+
@Field(() => String, {
8+
description: 'Workflow version ID',
9+
nullable: false,
10+
})
11+
workflowVersionId: string;
12+
13+
@Field(() => graphqlTypeJson, {
14+
description: 'Execution result in JSON format',
15+
nullable: true,
16+
})
17+
payload?: JSON;
18+
}

packages/twenty-server/src/engine/core-modules/workflow/workflow-trigger.resolver.ts

+6-4
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import { UseGuards } from '@nestjs/common';
22
import { Args, Mutation, Resolver } from '@nestjs/graphql';
33

4+
import { RunWorkflowVersionInput } from 'src/engine/core-modules/workflow/dtos/run-workflow-version-input.dto';
5+
import { WorkflowTriggerResultDTO } from 'src/engine/core-modules/workflow/dtos/workflow-trigger-result.dto';
46
import { workflowTriggerGraphqlApiExceptionHandler } from 'src/engine/core-modules/workflow/utils/workflow-trigger-graphql-api-exception-handler.util';
57
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
68
import { AuthWorkspace } from 'src/engine/decorators/auth/auth-workspace.decorator';
79
import { JwtAuthGuard } from 'src/engine/guards/jwt.auth.guard';
810
import { WorkflowTriggerService } from 'src/modules/workflow/workflow-trigger/workflow-trigger.service';
9-
import { WorkflowTriggerResultDTO } from 'src/engine/core-modules/workflow/dtos/workflow-trigger-result.dto';
1011

1112
@UseGuards(JwtAuthGuard)
1213
@Resolver()
@@ -31,15 +32,16 @@ export class WorkflowTriggerResolver {
3132
}
3233

3334
@Mutation(() => WorkflowTriggerResultDTO)
34-
async triggerWorkflow(
35+
async runWorkflowVersion(
3536
@AuthWorkspace() { id: workspaceId }: Workspace,
36-
@Args('workflowVersionId') workflowVersionId: string,
37+
@Args('input') { workflowVersionId, payload }: RunWorkflowVersionInput,
3738
) {
3839
try {
3940
return {
40-
result: await this.workflowTriggerService.runWorkflow(
41+
result: await this.workflowTriggerService.runWorkflowVersion(
4142
workspaceId,
4243
workflowVersionId,
44+
payload ?? {},
4345
),
4446
};
4547
} catch (error) {

packages/twenty-server/src/engine/integrations/message-queue/jobs.module.ts

+2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import { AutoCompaniesAndContactsCreationJobModule } from 'src/modules/contact-c
2323
import { MessagingModule } from 'src/modules/messaging/messaging.module';
2424
import { TimelineJobModule } from 'src/modules/timeline/jobs/timeline-job.module';
2525
import { TimelineActivityModule } from 'src/modules/timeline/timeline-activity.module';
26+
import { WorkflowModule } from 'src/modules/workflow/workflow.module';
2627

2728
@Module({
2829
imports: [
@@ -43,6 +44,7 @@ import { TimelineActivityModule } from 'src/modules/timeline/timeline-activity.m
4344
WorkspaceQueryRunnerJobModule,
4445
AutoCompaniesAndContactsCreationJobModule,
4546
TimelineJobModule,
47+
WorkflowModule,
4648
],
4749
providers: [
4850
CleanInactiveWorkspaceJob,

packages/twenty-server/src/modules/workflow/common/types/workflow-trigger.type.ts

+1-8
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import { WorkflowAction } from 'src/modules/workflow/common/types/workflow-actio
22

33
export enum WorkflowTriggerType {
44
DATABASE_EVENT = 'DATABASE_EVENT',
5-
MANUAL = 'MANUAL',
65
}
76

87
type BaseTrigger = {
@@ -19,10 +18,4 @@ export type WorkflowDatabaseEventTrigger = BaseTrigger & {
1918
};
2019
};
2120

22-
type WorkflowManualTrigger = BaseTrigger & {
23-
type: WorkflowTriggerType.MANUAL;
24-
};
25-
26-
export type WorkflowTrigger =
27-
| WorkflowDatabaseEventTrigger
28-
| WorkflowManualTrigger;
21+
export type WorkflowTrigger = WorkflowDatabaseEventTrigger;

packages/twenty-server/src/modules/workflow/common/workflow-common.services.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ import { Injectable } from '@nestjs/common';
22

33
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
44
import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
5+
import { WorkflowTrigger } from 'src/modules/workflow/common/types/workflow-trigger.type';
56
import {
67
WorkflowTriggerException,
78
WorkflowTriggerExceptionCode,
89
} from 'src/modules/workflow/workflow-trigger/workflow-trigger.exception';
9-
import { WorkflowTrigger } from 'src/modules/workflow/common/types/workflow-trigger.type';
1010

1111
@Injectable()
1212
export class WorkflowCommonService {
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,54 @@
1+
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
12
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
23
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
3-
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
4-
import { WorkflowRunnerService } from 'src/modules/workflow/workflow-runner/workflow-runner.service';
4+
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
5+
import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity';
56
import { WorkflowCommonService } from 'src/modules/workflow/common/workflow-common.services';
7+
import { WorkflowRunnerService } from 'src/modules/workflow/workflow-runner/workflow-runner.service';
68

7-
type RunWorkflowJobData = { workspaceId: string; workflowVersionId: string };
9+
export type RunWorkflowJobData = {
10+
workspaceId: string;
11+
workflowId: string;
12+
payload: object;
13+
};
814

915
@Processor(MessageQueue.workflowQueue)
1016
export class WorkflowRunnerJob {
1117
constructor(
1218
private readonly workflowCommonService: WorkflowCommonService,
1319
private readonly workflowRunnerService: WorkflowRunnerService,
20+
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
1421
) {}
1522

1623
@Process(WorkflowRunnerJob.name)
1724
async handle({
1825
workspaceId,
19-
workflowVersionId,
26+
workflowId,
27+
payload,
2028
}: RunWorkflowJobData): Promise<void> {
29+
const workflowRepository =
30+
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowWorkspaceEntity>(
31+
workspaceId,
32+
'workflow',
33+
);
34+
35+
const workflow = await workflowRepository.findOneByOrFail({
36+
id: workflowId,
37+
});
38+
39+
if (!workflow.publishedVersionId) {
40+
throw new Error('Workflow has no published version');
41+
}
42+
2143
const workflowVersion = await this.workflowCommonService.getWorkflowVersion(
2244
workspaceId,
23-
workflowVersionId,
45+
workflow.publishedVersionId,
2446
);
2547

2648
await this.workflowRunnerService.run({
2749
action: workflowVersion.trigger.nextAction,
2850
workspaceId,
29-
payload: workflowVersion.trigger.input,
51+
payload,
3052
});
3153
}
3254
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { Logger } from '@nestjs/common';
2+
3+
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
4+
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
5+
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
6+
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
7+
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
8+
import {
9+
RunWorkflowJobData,
10+
WorkflowRunnerJob,
11+
} from 'src/modules/workflow/workflow-runner/workflow-runner.job';
12+
13+
export type WorkflowEventTriggerJobData = {
14+
workspaceId: string;
15+
workflowId: string;
16+
payload: object;
17+
};
18+
19+
@Processor(MessageQueue.workflowQueue)
20+
export class WorkflowEventTriggerJob {
21+
private readonly logger = new Logger(WorkflowEventTriggerJob.name);
22+
23+
constructor(
24+
@InjectMessageQueue(MessageQueue.workflowQueue)
25+
private readonly messageQueueService: MessageQueueService,
26+
) {}
27+
28+
@Process(WorkflowEventTriggerJob.name)
29+
async handle(data: WorkflowEventTriggerJobData): Promise<void> {
30+
this.messageQueueService.add<RunWorkflowJobData>(WorkflowRunnerJob.name, {
31+
workspaceId: data.workspaceId,
32+
workflowId: data.workflowId,
33+
payload: data.payload,
34+
});
35+
}
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import { Module } from '@nestjs/common';
2+
3+
import { WorkflowRunnerModule } from 'src/modules/workflow/workflow-runner/workflow-runner.module';
4+
import { WorkflowEventTriggerJob } from 'src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job';
5+
6+
@Module({
7+
imports: [WorkflowRunnerModule],
8+
providers: [WorkflowEventTriggerJob],
9+
})
10+
export class WorkflowTriggerJobModule {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import { Injectable, Logger } from '@nestjs/common';
2+
import { OnEvent } from '@nestjs/event-emitter';
3+
4+
import { FeatureFlagKey } from 'src/engine/core-modules/feature-flag/enums/feature-flag-key.enum';
5+
import { IsFeatureEnabledService } from 'src/engine/core-modules/feature-flag/services/is-feature-enabled.service';
6+
import { ObjectRecordCreateEvent } from 'src/engine/integrations/event-emitter/types/object-record-create.event';
7+
import { ObjectRecordDeleteEvent } from 'src/engine/integrations/event-emitter/types/object-record-delete.event';
8+
import { ObjectRecordUpdateEvent } from 'src/engine/integrations/event-emitter/types/object-record-update.event';
9+
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
10+
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
11+
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
12+
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
13+
import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity';
14+
import {
15+
WorkflowEventTriggerJob,
16+
WorkflowEventTriggerJobData,
17+
} from 'src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job';
18+
19+
@Injectable()
20+
export class DatabaseEventTriggerListener {
21+
private readonly logger = new Logger('DatabaseEventTriggerListener');
22+
23+
constructor(
24+
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
25+
@InjectMessageQueue(MessageQueue.workflowQueue)
26+
private readonly messageQueueService: MessageQueueService,
27+
private readonly isFeatureFlagEnabledService: IsFeatureEnabledService,
28+
) {}
29+
30+
@OnEvent('*.created')
31+
async handleObjectRecordCreateEvent(payload: ObjectRecordCreateEvent<any>) {
32+
await this.handleEvent(payload);
33+
}
34+
35+
@OnEvent('*.updated')
36+
async handleObjectRecordUpdateEvent(payload: ObjectRecordUpdateEvent<any>) {
37+
await this.handleEvent(payload);
38+
}
39+
40+
@OnEvent('*.deleted')
41+
async handleObjectRecordDeleteEvent(payload: ObjectRecordDeleteEvent<any>) {
42+
await this.handleEvent(payload);
43+
}
44+
45+
private async handleEvent(
46+
payload:
47+
| ObjectRecordCreateEvent<any>
48+
| ObjectRecordUpdateEvent<any>
49+
| ObjectRecordDeleteEvent<any>,
50+
) {
51+
const workspaceId = payload.workspaceId;
52+
const eventName = payload.name;
53+
54+
if (!workspaceId || !eventName) {
55+
this.logger.error(
56+
`Missing workspaceId or eventName in payload ${JSON.stringify(
57+
payload,
58+
)}`,
59+
);
60+
61+
return;
62+
}
63+
64+
const isWorkflowEnabled =
65+
await this.isFeatureFlagEnabledService.isFeatureEnabled(
66+
FeatureFlagKey.IsWorkflowEnabled,
67+
workspaceId,
68+
);
69+
70+
if (!isWorkflowEnabled) {
71+
return;
72+
}
73+
74+
const workflowEventListenerRepository =
75+
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowEventListenerWorkspaceEntity>(
76+
workspaceId,
77+
'workflowEventListener',
78+
);
79+
80+
const eventListeners = await workflowEventListenerRepository.find({
81+
where: {
82+
eventName,
83+
},
84+
});
85+
86+
for (const eventListener of eventListeners) {
87+
this.messageQueueService.add<WorkflowEventTriggerJobData>(
88+
WorkflowEventTriggerJob.name,
89+
{
90+
workspaceId,
91+
workflowId: eventListener.workflowId,
92+
payload,
93+
},
94+
{ retryLimit: 3 },
95+
);
96+
}
97+
}
98+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import { Module } from '@nestjs/common';
2+
3+
import { FeatureFlagModule } from 'src/engine/core-modules/feature-flag/feature-flag.module';
4+
import { DatabaseEventTriggerListener } from 'src/modules/workflow/workflow-trigger/listeners/database-event-trigger.listener';
5+
6+
@Module({
7+
imports: [FeatureFlagModule],
8+
providers: [DatabaseEventTriggerListener],
9+
})
10+
export class WorkflowTriggerListenerModule {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import { Module } from '@nestjs/common';
2+
3+
import { WorkflowTriggerJobModule } from 'src/modules/workflow/workflow-trigger/jobs/workflow-trigger-job.module';
4+
import { WorkflowTriggerListenerModule } from 'src/modules/workflow/workflow-trigger/listeners/workflow-trigger-listener.module';
5+
6+
@Module({
7+
imports: [WorkflowTriggerJobModule, WorkflowTriggerListenerModule],
8+
})
9+
export class WorkflowTriggerModule {}

0 commit comments

Comments
 (0)