From e4ed6e166717f0c758b936dd145fb1d0a98aa978 Mon Sep 17 00:00:00 2001 From: Thomas Trompette Date: Wed, 14 Aug 2024 16:15:47 +0200 Subject: [PATCH] Build a workspace service --- .../composite-types/actor.composite-type.ts | 1 + .../constants/standard-field-ids.ts | 1 + .../workflow-run.workspace-entity.ts | 19 +++- .../workflow-runner/workflow-runner.job.ts | 20 ++-- .../workflow-status.exception.ts | 2 +- .../workflow-status/workflow-status.module.ts | 6 +- .../workflow-status.service.ts | 104 ------------------ .../workflow-status.workspace-service.ts | 92 ++++++++++++++++ .../jobs/workflow-event-trigger.job.ts | 22 ++-- 9 files changed, 139 insertions(+), 128 deletions(-) delete mode 100644 packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.service.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.workspace-service.ts diff --git a/packages/twenty-server/src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type.ts b/packages/twenty-server/src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type.ts index 69c6d256ab45..1efa0eeffff5 100644 --- a/packages/twenty-server/src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type.ts +++ b/packages/twenty-server/src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type.ts @@ -8,6 +8,7 @@ import { FieldMetadataType } from 'src/engine/metadata-modules/field-metadata/fi export enum FieldActorSource { EMAIL = 'EMAIL', CALENDAR = 'CALENDAR', + WORKFLOW = 'WORKFLOW', API = 'API', IMPORT = 'IMPORT', MANUAL = 'MANUAL', diff --git a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts index d8b7d070961d..5ce6734b97f8 100644 --- a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts +++ b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts @@ -416,6 +416,7 @@ export const WORKFLOW_RUN_STANDARD_FIELD_IDS = { startedAt: '20202020-a234-4e2d-bd15-85bcea6bb183', endedAt: '20202020-e1c1-4b6b-bbbd-b2beaf2e159e', status: '20202020-6b3e-4f9c-8c2b-2e5b8e6d6f3b', + createdBy: '20202020-6007-401a-8aa5-e6f38581a6f3', }; export const WORKFLOW_VERSION_STANDARD_FIELD_IDS = { diff --git a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts index 322ce04ab6ca..28b2ae26ea25 100644 --- a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts +++ b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts @@ -1,6 +1,10 @@ import { Relation } from 'src/engine/workspace-manager/workspace-sync-metadata/interfaces/relation.interface'; import { FeatureFlagKey } from 'src/engine/core-modules/feature-flag/enums/feature-flag-key.enum'; +import { + ActorMetadata, + FieldActorSource, +} from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type'; import { FieldMetadataType } from 'src/engine/metadata-modules/field-metadata/field-metadata.entity'; import { RelationMetadataType } from 'src/engine/metadata-modules/relation-metadata/relation-metadata.entity'; import { BaseWorkspaceEntity } from 'src/engine/twenty-orm/base.workspace-entity'; @@ -61,9 +65,21 @@ export class WorkflowRunWorkspaceEntity extends BaseWorkspaceEntity { description: 'Workflow run status', icon: 'IconHistory', }) - @WorkspaceIsNullable() status: WorkflowRunStatus; + @WorkspaceField({ + standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.createdBy, + type: FieldMetadataType.ACTOR, + label: 'Created by', + icon: 'IconCreativeCommonsSa', + description: 'The creator of the record', + defaultValue: { + source: `'${FieldActorSource.MANUAL}'`, + name: "''", + }, + }) + createdBy: ActorMetadata; + // Relations @WorkspaceRelation({ standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.workflowVersion, @@ -74,7 +90,6 @@ export class WorkflowRunWorkspaceEntity extends BaseWorkspaceEntity { inverseSideTarget: () => WorkflowVersionWorkspaceEntity, inverseSideFieldKey: 'runs', }) - @WorkspaceIsNullable() workflowVersion: Relation; @WorkspaceJoinColumn('workflowVersion') diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.job.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.job.ts index e6404d5dd494..176418b6b46e 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.job.ts @@ -1,35 +1,36 @@ +import { Scope } from '@nestjs/common'; + import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; import { WorkflowCommonService } from 'src/modules/workflow/common/workflow-common.services'; import { WorkflowRunnerService } from 'src/modules/workflow/workflow-runner/workflow-runner.service'; -import { WorkflowStatusService } from 'src/modules/workflow/workflow-status/workflow-status.service'; +import { WorkflowStatusWorkspaceService } from 'src/modules/workflow/workflow-status/workflow-status.workspace-service'; export type RunWorkflowJobData = { workspaceId: string; workflowVersionId: string; + workflowRunId: string; payload: object; }; -@Processor(MessageQueue.workflowQueue) +@Processor({ queueName: MessageQueue.workflowQueue, scope: Scope.REQUEST }) export class WorkflowRunnerJob { constructor( private readonly workflowCommonService: WorkflowCommonService, private readonly workflowRunnerService: WorkflowRunnerService, - private readonly workflowStatusService: WorkflowStatusService, + private readonly workflowStatusWorkspaceService: WorkflowStatusWorkspaceService, ) {} @Process(WorkflowRunnerJob.name) async handle({ workspaceId, workflowVersionId, + workflowRunId, payload, }: RunWorkflowJobData): Promise { - await this.workflowStatusService.startWorkflowRun( - workspaceId, - workflowVersionId, - ); + await this.workflowStatusWorkspaceService.startWorkflowRun(workflowRunId); const workflowVersion = await this.workflowCommonService.getWorkflowVersion( workspaceId, @@ -42,9 +43,8 @@ export class WorkflowRunnerJob { payload, }); - await this.workflowStatusService.endWorkflowRun( - workspaceId, - workflowVersionId, + await this.workflowStatusWorkspaceService.endWorkflowRun( + workflowRunId, output.error ? WorkflowRunStatus.FAILED : WorkflowRunStatus.COMPLETED, ); } diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.exception.ts b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.exception.ts index ac317871bce9..6510815f090c 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.exception.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.exception.ts @@ -9,5 +9,5 @@ export class WorkflowStatusException extends CustomException { export enum WorkflowStatusExceptionCode { WORKFLOW_RUN_NOT_FOUND = 'WORKFLOW_RUN_NOT_FOUND', - WORKFLOW_RUN_ALREADY_STARTED = 'WORKFLOW_RUN_ALREADY_STARTED', + INVALID_OPERATION = 'INVALID_OPERATION', } diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts index 5ea9f215d0a9..e27b5441df45 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts @@ -2,11 +2,11 @@ import { Module } from '@nestjs/common'; import { TwentyORMModule } from 'src/engine/twenty-orm/twenty-orm.module'; import { WorkflowRunWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; -import { WorkflowStatusService } from 'src/modules/workflow/workflow-status/workflow-status.service'; +import { WorkflowStatusWorkspaceService } from 'src/modules/workflow/workflow-status/workflow-status.workspace-service'; @Module({ imports: [TwentyORMModule.forFeature([WorkflowRunWorkspaceEntity])], - providers: [WorkflowStatusService], - exports: [WorkflowStatusService], + providers: [WorkflowStatusWorkspaceService], + exports: [WorkflowStatusWorkspaceService], }) export class WorkflowStatusModule {} diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.service.ts b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.service.ts deleted file mode 100644 index 2beb368cb737..000000000000 --- a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.service.ts +++ /dev/null @@ -1,104 +0,0 @@ -import { Injectable } from '@nestjs/common'; - -import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; -import { - WorkflowRunStatus, - WorkflowRunWorkspaceEntity, -} from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; -import { - WorkflowStatusException, - WorkflowStatusExceptionCode, -} from 'src/modules/workflow/workflow-status/workflow-status.exception'; - -@Injectable() -export class WorkflowStatusService { - constructor( - private readonly twentyORMGlobalManager: TwentyORMGlobalManager, - ) {} - - async createWorkflowRun(workspaceId: string, workflowVersionId: string) { - const workflowRunDataSource = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - workspaceId, - 'workflowRun', - ); - - const onGoingWorkflowRuns = ( - await workflowRunDataSource.findBy({ - workflowVersionId, - }) - ).filter((workflow) => - [WorkflowRunStatus.NOT_STARTED, WorkflowRunStatus.RUNNING].includes( - workflow.status, - ), - ); - - if (onGoingWorkflowRuns.length > 0) { - throw new WorkflowStatusException( - 'There is already an on going workflow run', - WorkflowStatusExceptionCode.WORKFLOW_RUN_ALREADY_STARTED, - ); - } - - const workflowRunToCreate = await workflowRunDataSource.create({ - workflowVersionId, - status: WorkflowRunStatus.NOT_STARTED, - }); - - return workflowRunDataSource.save(workflowRunToCreate); - } - - async startWorkflowRun(workspaceId: string, workflowVersionId: string) { - const workflowRunDataSource = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - workspaceId, - 'workflowRun', - ); - - const workflowRunToUpdate = await workflowRunDataSource.findOneBy({ - workflowVersionId, - status: WorkflowRunStatus.NOT_STARTED, - }); - - if (!workflowRunToUpdate) { - throw new WorkflowStatusException( - 'No workflow run to start', - WorkflowStatusExceptionCode.WORKFLOW_RUN_NOT_FOUND, - ); - } - - return workflowRunDataSource.update(workflowRunToUpdate.id, { - status: WorkflowRunStatus.RUNNING, - startedAt: new Date().toISOString(), - }); - } - - async endWorkflowRun( - workspaceId: string, - workflowVersionId: string, - status: WorkflowRunStatus, - ) { - const workflowRunDataSource = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - workspaceId, - 'workflowRun', - ); - - const workflowRunToUpdate = await workflowRunDataSource.findOneBy({ - workflowVersionId, - status: WorkflowRunStatus.RUNNING, - }); - - if (!workflowRunToUpdate) { - throw new WorkflowStatusException( - 'No workflow run to end', - WorkflowStatusExceptionCode.WORKFLOW_RUN_NOT_FOUND, - ); - } - - return workflowRunDataSource.update(workflowRunToUpdate.id, { - status, - endedAt: new Date().toISOString(), - }); - } -} diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.workspace-service.ts new file mode 100644 index 000000000000..949ae0c27e21 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.workspace-service.ts @@ -0,0 +1,92 @@ +import { Injectable } from '@nestjs/common'; + +import { ActorMetadata } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { + WorkflowRunStatus, + WorkflowRunWorkspaceEntity, +} from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; +import { + WorkflowStatusException, + WorkflowStatusExceptionCode, +} from 'src/modules/workflow/workflow-status/workflow-status.exception'; + +@Injectable() +export class WorkflowStatusWorkspaceService { + constructor(private readonly twentyORMManager: TwentyORMManager) {} + + async createWorkflowRun(workflowVersionId: string, createdBy: ActorMetadata) { + const workflowRunRepository = + await this.twentyORMManager.getRepository( + 'workflowRun', + ); + + return ( + await workflowRunRepository.save({ + workflowVersionId, + createdBy, + status: WorkflowRunStatus.NOT_STARTED, + }) + ).id; + } + + async startWorkflowRun(workflowRunId: string) { + const workflowRunRepository = + await this.twentyORMManager.getRepository( + 'workflowRun', + ); + + const workflowRunToUpdate = await workflowRunRepository.findOneBy({ + id: workflowRunId, + }); + + if (!workflowRunToUpdate) { + throw new WorkflowStatusException( + 'No workflow run to start', + WorkflowStatusExceptionCode.WORKFLOW_RUN_NOT_FOUND, + ); + } + + if (workflowRunToUpdate.status !== WorkflowRunStatus.NOT_STARTED) { + throw new WorkflowStatusException( + 'Workflow run already started', + WorkflowStatusExceptionCode.INVALID_OPERATION, + ); + } + + return workflowRunRepository.update(workflowRunToUpdate.id, { + status: WorkflowRunStatus.RUNNING, + startedAt: new Date().toISOString(), + }); + } + + async endWorkflowRun(workflowRunId: string, status: WorkflowRunStatus) { + const workflowRunRepository = + await this.twentyORMManager.getRepository( + 'workflowRun', + ); + + const workflowRunToUpdate = await workflowRunRepository.findOneBy({ + id: workflowRunId, + }); + + if (!workflowRunToUpdate) { + throw new WorkflowStatusException( + 'No workflow run to end', + WorkflowStatusExceptionCode.WORKFLOW_RUN_NOT_FOUND, + ); + } + + if (workflowRunToUpdate.status !== WorkflowRunStatus.RUNNING) { + throw new WorkflowStatusException( + 'Workflow cannot be ended as it is not running', + WorkflowStatusExceptionCode.INVALID_OPERATION, + ); + } + + return workflowRunRepository.update(workflowRunToUpdate.id, { + status, + endedAt: new Date().toISOString(), + }); + } +} diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job.ts index 1fbdc6d6c497..9b901092991b 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job.ts @@ -1,17 +1,18 @@ -import { Logger } from '@nestjs/common'; +import { Logger, Scope } from '@nestjs/common'; import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { FieldActorSource } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type'; import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; import { RunWorkflowJobData, WorkflowRunnerJob, } from 'src/modules/workflow/workflow-runner/workflow-runner.job'; -import { WorkflowStatusService } from 'src/modules/workflow/workflow-status/workflow-status.service'; +import { WorkflowStatusWorkspaceService } from 'src/modules/workflow/workflow-status/workflow-status.workspace-service'; export type WorkflowEventTriggerJobData = { workspaceId: string; @@ -19,7 +20,7 @@ export type WorkflowEventTriggerJobData = { payload: object; }; -@Processor(MessageQueue.workflowQueue) +@Processor({ queueName: MessageQueue.workflowQueue, scope: Scope.REQUEST }) export class WorkflowEventTriggerJob { private readonly logger = new Logger(WorkflowEventTriggerJob.name); @@ -27,7 +28,7 @@ export class WorkflowEventTriggerJob { @InjectMessageQueue(MessageQueue.workflowQueue) private readonly messageQueueService: MessageQueueService, private readonly twentyORMGlobalManager: TwentyORMGlobalManager, - private readonly workflowStatusService: WorkflowStatusService, + private readonly workflowStatusWorkspaceService: WorkflowStatusWorkspaceService, ) {} @Process(WorkflowEventTriggerJob.name) @@ -46,15 +47,20 @@ export class WorkflowEventTriggerJob { throw new Error('Workflow has no published version'); } - await this.workflowStatusService.createWorkflowRun( - data.workspaceId, - workflow.publishedVersionId, - ); + const workflowRunId = + await this.workflowStatusWorkspaceService.createWorkflowRun( + workflow.publishedVersionId, + { + source: FieldActorSource.WORKFLOW, + name: workflow.name, + }, + ); this.messageQueueService.add(WorkflowRunnerJob.name, { workspaceId: data.workspaceId, workflowVersionId: workflow.publishedVersionId, payload: data.payload, + workflowRunId, }); } }