Skip to content

Commit c52a492

Browse files
authored
Add throttling on workflow execution (#9263)
We want to avoid infinite loops using workflows. Adding a throttler with a limit of 10 executions / sec by default for each workflow. We were not emitting events on workflow actions so loops could not happen. Since throttler is there we can now and these. Adding an error message so the user knows when it happens. <img width="1284" alt="Capture d’écran 2024-12-27 à 17 05 20" src="https://github.com/user-attachments/assets/dafa837b-5b4c-48be-8207-c90f5c71a236" />
1 parent ba2f55a commit c52a492

File tree

10 files changed

+213
-23
lines changed

10 files changed

+213
-23
lines changed

packages/twenty-front/src/modules/workflow/types/Workflow.ts

+1
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ type StepRunOutput = {
167167

168168
export type WorkflowRunOutput = {
169169
steps: Record<string, StepRunOutput>;
170+
error?: string;
170171
};
171172

172173
export type WorkflowRun = {

packages/twenty-server/src/engine/core-modules/environment/environment-variables.ts

+7
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,13 @@ export class EnvironmentVariables {
485485
@CastToPositiveNumber()
486486
SERVERLESS_FUNCTION_EXEC_THROTTLE_TTL = 1000;
487487

488+
@CastToPositiveNumber()
489+
WORKFLOW_EXEC_THROTTLE_LIMIT = 10;
490+
491+
// milliseconds
492+
@CastToPositiveNumber()
493+
WORKFLOW_EXEC_THROTTLE_TTL = 1000;
494+
488495
// SSL
489496
@IsString()
490497
@IsOptional()

packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts

+1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ type StepRunOutput = {
4646

4747
export type WorkflowRunOutput = {
4848
steps: Record<string, StepRunOutput>;
49+
error?: string;
4950
};
5051

5152
@WorkspaceEntity({

packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action.ts

+55-1
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,31 @@
11
import { Injectable } from '@nestjs/common';
2+
import { InjectRepository } from '@nestjs/typeorm';
3+
4+
import { Repository } from 'typeorm';
25

36
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface';
47

8+
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
9+
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
10+
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
511
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
12+
import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';
13+
import {
14+
RecordCRUDActionException,
15+
RecordCRUDActionExceptionCode,
16+
} from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/exceptions/record-crud-action.exception';
617
import { WorkflowCreateRecordActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/types/workflow-record-crud-action-input.type';
718
import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type';
819

920
@Injectable()
1021
export class CreateRecordWorkflowAction implements WorkflowAction {
11-
constructor(private readonly twentyORMManager: TwentyORMManager) {}
22+
constructor(
23+
private readonly twentyORMManager: TwentyORMManager,
24+
@InjectRepository(ObjectMetadataEntity, 'metadata')
25+
private readonly objectMetadataRepository: Repository<ObjectMetadataEntity>,
26+
private readonly workspaceEventEmitter: WorkspaceEventEmitter,
27+
private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory,
28+
) {}
1229

1330
async execute(
1431
workflowActionInput: WorkflowCreateRecordActionInput,
@@ -17,10 +34,47 @@ export class CreateRecordWorkflowAction implements WorkflowAction {
1734
workflowActionInput.objectName,
1835
);
1936

37+
const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId;
38+
39+
if (!workspaceId) {
40+
throw new RecordCRUDActionException(
41+
'Failed to create: Workspace ID is required',
42+
RecordCRUDActionExceptionCode.INVALID_REQUEST,
43+
);
44+
}
45+
46+
const objectMetadata = await this.objectMetadataRepository.findOne({
47+
where: {
48+
nameSingular: workflowActionInput.objectName,
49+
},
50+
});
51+
52+
if (!objectMetadata) {
53+
throw new RecordCRUDActionException(
54+
'Failed to create: Object metadata not found',
55+
RecordCRUDActionExceptionCode.INVALID_REQUEST,
56+
);
57+
}
58+
2059
const objectRecord = await repository.save(
2160
workflowActionInput.objectRecord,
2261
);
2362

63+
this.workspaceEventEmitter.emitDatabaseBatchEvent({
64+
objectMetadataNameSingular: workflowActionInput.objectName,
65+
action: DatabaseEventAction.CREATED,
66+
events: [
67+
{
68+
recordId: objectRecord.id,
69+
objectMetadata,
70+
properties: {
71+
after: objectRecord,
72+
},
73+
},
74+
],
75+
workspaceId,
76+
});
77+
2478
return {
2579
result: objectRecord,
2680
};

packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/delete-record.workflow-action.ts

+51-3
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,15 @@
11
import { Injectable } from '@nestjs/common';
2+
import { InjectRepository } from '@nestjs/typeorm';
3+
4+
import { Repository } from 'typeorm';
25

36
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface';
47

8+
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
9+
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
10+
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
511
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
12+
import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';
613
import {
714
RecordCRUDActionException,
815
RecordCRUDActionExceptionCode,
@@ -12,7 +19,13 @@ import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/wor
1219

1320
@Injectable()
1421
export class DeleteRecordWorkflowAction implements WorkflowAction {
15-
constructor(private readonly twentyORMManager: TwentyORMManager) {}
22+
constructor(
23+
private readonly twentyORMManager: TwentyORMManager,
24+
@InjectRepository(ObjectMetadataEntity, 'metadata')
25+
private readonly objectMetadataRepository: Repository<ObjectMetadataEntity>,
26+
private readonly workspaceEventEmitter: WorkspaceEventEmitter,
27+
private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory,
28+
) {}
1629

1730
async execute(
1831
workflowActionInput: WorkflowDeleteRecordActionInput,
@@ -21,6 +34,28 @@ export class DeleteRecordWorkflowAction implements WorkflowAction {
2134
workflowActionInput.objectName,
2235
);
2336

37+
const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId;
38+
39+
if (!workspaceId) {
40+
throw new RecordCRUDActionException(
41+
'Failed to delete: Workspace ID is required',
42+
RecordCRUDActionExceptionCode.INVALID_REQUEST,
43+
);
44+
}
45+
46+
const objectMetadata = await this.objectMetadataRepository.findOne({
47+
where: {
48+
nameSingular: workflowActionInput.objectName,
49+
},
50+
});
51+
52+
if (!objectMetadata) {
53+
throw new RecordCRUDActionException(
54+
'Failed to delete: Object metadata not found',
55+
RecordCRUDActionExceptionCode.INVALID_REQUEST,
56+
);
57+
}
58+
2459
const objectRecord = await repository.findOne({
2560
where: {
2661
id: workflowActionInput.objectRecordId,
@@ -34,8 +69,21 @@ export class DeleteRecordWorkflowAction implements WorkflowAction {
3469
);
3570
}
3671

37-
await repository.update(workflowActionInput.objectRecordId, {
38-
deletedAt: new Date(),
72+
await repository.softDelete(workflowActionInput.objectRecordId);
73+
74+
this.workspaceEventEmitter.emitDatabaseBatchEvent({
75+
objectMetadataNameSingular: workflowActionInput.objectName,
76+
action: DatabaseEventAction.DELETED,
77+
events: [
78+
{
79+
recordId: objectRecord.id,
80+
objectMetadata,
81+
properties: {
82+
before: objectRecord,
83+
},
84+
},
85+
],
86+
workspaceId,
3987
});
4088

4189
return {

packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/record-crud-action.module.ts

+7-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
import { Module } from '@nestjs/common';
22

3+
import { NestjsQueryTypeOrmModule } from '@ptc-org/nestjs-query-typeorm';
4+
5+
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
36
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
47
import { WorkspaceCacheStorageModule } from 'src/engine/workspace-cache-storage/workspace-cache-storage.module';
58
import { CreateRecordWorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action';
@@ -8,7 +11,10 @@ import { FindRecordsWorflowAction } from 'src/modules/workflow/workflow-executor
811
import { UpdateRecordWorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action';
912

1013
@Module({
11-
imports: [WorkspaceCacheStorageModule],
14+
imports: [
15+
WorkspaceCacheStorageModule,
16+
NestjsQueryTypeOrmModule.forFeature([ObjectMetadataEntity], 'metadata'),
17+
],
1218
providers: [
1319
ScopedWorkspaceContextFactory,
1420
CreateRecordWorkflowAction,

packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action.ts

+54-16
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
import { Injectable } from '@nestjs/common';
2+
import { InjectRepository } from '@nestjs/typeorm';
3+
4+
import { Repository } from 'typeorm';
25

36
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface';
47

8+
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
9+
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
510
import { getObjectMetadataMapItemByNameSingular } from 'src/engine/metadata-modules/utils/get-object-metadata-map-item-by-name-singular.util';
611
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
712
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
813
import { formatData } from 'src/engine/twenty-orm/utils/format-data.util';
914
import { WorkspaceCacheStorageService } from 'src/engine/workspace-cache-storage/workspace-cache-storage.service';
15+
import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';
1016
import {
1117
RecordCRUDActionException,
1218
RecordCRUDActionExceptionCode,
@@ -20,6 +26,9 @@ export class UpdateRecordWorkflowAction implements WorkflowAction {
2026
private readonly twentyORMManager: TwentyORMManager,
2127
private readonly workspaceCacheStorageService: WorkspaceCacheStorageService,
2228
private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory,
29+
@InjectRepository(ObjectMetadataEntity, 'metadata')
30+
private readonly objectMetadataRepository: Repository<ObjectMetadataEntity>,
31+
private readonly workspaceEventEmitter: WorkspaceEventEmitter,
2332
) {}
2433

2534
async execute(
@@ -29,25 +38,38 @@ export class UpdateRecordWorkflowAction implements WorkflowAction {
2938
workflowActionInput.objectName,
3039
);
3140

32-
const objectRecord = await repository.findOne({
41+
const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId;
42+
43+
if (!workspaceId) {
44+
throw new RecordCRUDActionException(
45+
'Failed to update: Workspace ID is required',
46+
RecordCRUDActionExceptionCode.INVALID_REQUEST,
47+
);
48+
}
49+
50+
const objectMetadata = await this.objectMetadataRepository.findOne({
3351
where: {
34-
id: workflowActionInput.objectRecordId,
52+
nameSingular: workflowActionInput.objectName,
3553
},
3654
});
3755

38-
if (!objectRecord) {
56+
if (!objectMetadata) {
3957
throw new RecordCRUDActionException(
40-
`Failed to update: Record ${workflowActionInput.objectName} with id ${workflowActionInput.objectRecordId} not found`,
41-
RecordCRUDActionExceptionCode.RECORD_NOT_FOUND,
58+
'Failed to update: Object metadata not found',
59+
RecordCRUDActionExceptionCode.INVALID_REQUEST,
4260
);
4361
}
4462

45-
const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId;
63+
const previousObjectRecord = await repository.findOne({
64+
where: {
65+
id: workflowActionInput.objectRecordId,
66+
},
67+
});
4668

47-
if (!workspaceId) {
69+
if (!previousObjectRecord) {
4870
throw new RecordCRUDActionException(
49-
'Failed to read: Workspace ID is required',
50-
RecordCRUDActionExceptionCode.INVALID_REQUEST,
71+
`Failed to update: Record ${workflowActionInput.objectName} with id ${workflowActionInput.objectRecordId} not found`,
72+
RecordCRUDActionExceptionCode.RECORD_NOT_FOUND,
5173
);
5274
}
5375

@@ -89,9 +111,7 @@ export class UpdateRecordWorkflowAction implements WorkflowAction {
89111

90112
if (workflowActionInput.fieldsToUpdate.length === 0) {
91113
return {
92-
result: {
93-
...objectRecord,
94-
},
114+
result: previousObjectRecord,
95115
};
96116
}
97117

@@ -117,11 +137,29 @@ export class UpdateRecordWorkflowAction implements WorkflowAction {
117137
...objectRecordFormatted,
118138
});
119139

140+
const updatedObjectRecord = {
141+
...previousObjectRecord,
142+
...objectRecordWithFilteredFields,
143+
};
144+
145+
this.workspaceEventEmitter.emitDatabaseBatchEvent({
146+
objectMetadataNameSingular: workflowActionInput.objectName,
147+
action: DatabaseEventAction.UPDATED,
148+
events: [
149+
{
150+
recordId: previousObjectRecord.id,
151+
objectMetadata,
152+
properties: {
153+
before: previousObjectRecord,
154+
after: updatedObjectRecord,
155+
},
156+
},
157+
],
158+
workspaceId,
159+
});
160+
120161
return {
121-
result: {
122-
...objectRecord,
123-
...objectRecordWithFilteredFields,
124-
},
162+
result: updatedObjectRecord,
125163
};
126164
}
127165
}

packages/twenty-server/src/modules/workflow/workflow-runner/exceptions/workflow-run.exception.ts

+1
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@ export enum WorkflowRunExceptionCode {
1111
WORKFLOW_RUN_NOT_FOUND = 'WORKFLOW_RUN_NOT_FOUND',
1212
INVALID_OPERATION = 'INVALID_OPERATION',
1313
INVALID_INPUT = 'INVALID_INPUT',
14+
WORKFLOW_RUN_LIMIT_REACHED = 'WORKFLOW_RUN_LIMIT_REACHED',
1415
}

0 commit comments

Comments
 (0)