Skip to content

Commit

Permalink
Build a workspace service
Browse files Browse the repository at this point in the history
  • Loading branch information
thomtrp committed Aug 14, 2024
1 parent 899fc3c commit a2fd157
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { FieldMetadataType } from 'src/engine/metadata-modules/field-metadata/fi
export enum FieldActorSource {
EMAIL = 'EMAIL',
CALENDAR = 'CALENDAR',
WORKFLOW = 'WORKFLOW',
API = 'API',
IMPORT = 'IMPORT',
MANUAL = 'MANUAL',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ export const WORKFLOW_RUN_STANDARD_FIELD_IDS = {
startedAt: '20202020-a234-4e2d-bd15-85bcea6bb183',
endedAt: '20202020-e1c1-4b6b-bbbd-b2beaf2e159e',
status: '20202020-6b3e-4f9c-8c2b-2e5b8e6d6f3b',
createdBy: '20202020-6007-401a-8aa5-e6f38581a6f3',
};

export const WORKFLOW_VERSION_STANDARD_FIELD_IDS = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { Relation } from 'src/engine/workspace-manager/workspace-sync-metadata/interfaces/relation.interface';

import { FeatureFlagKey } from 'src/engine/core-modules/feature-flag/enums/feature-flag-key.enum';
import {
ActorMetadata,
FieldActorSource,
} from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type';
import { FieldMetadataType } from 'src/engine/metadata-modules/field-metadata/field-metadata.entity';
import { RelationMetadataType } from 'src/engine/metadata-modules/relation-metadata/relation-metadata.entity';
import { BaseWorkspaceEntity } from 'src/engine/twenty-orm/base.workspace-entity';
Expand Down Expand Up @@ -64,6 +68,19 @@ export class WorkflowRunWorkspaceEntity extends BaseWorkspaceEntity {
@WorkspaceIsNullable()
status: WorkflowRunStatus;

@WorkspaceField({
standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.createdBy,
type: FieldMetadataType.ACTOR,
label: 'Created by',
icon: 'IconCreativeCommonsSa',
description: 'The creator of the record',
defaultValue: {
source: `'${FieldActorSource.MANUAL}'`,
name: "''",
},
})
createdBy: ActorMetadata;

// Relations
@WorkspaceRelation({
standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.workflowVersion,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,36 @@
import { Scope } from '@nestjs/common';

import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import { WorkflowCommonService } from 'src/modules/workflow/common/workflow-common.services';
import { WorkflowRunnerService } from 'src/modules/workflow/workflow-runner/workflow-runner.service';
import { WorkflowStatusService } from 'src/modules/workflow/workflow-status/workflow-status.service';
import { WorkflowStatusWorkspaceService } from 'src/modules/workflow/workflow-status/workflow-status.workspace-service';

export type RunWorkflowJobData = {
workspaceId: string;
workflowVersionId: string;
workflowRunId: string;
payload: object;
};

@Processor(MessageQueue.workflowQueue)
@Processor({ queueName: MessageQueue.workflowQueue, scope: Scope.REQUEST })
export class WorkflowRunnerJob {
constructor(
private readonly workflowCommonService: WorkflowCommonService,
private readonly workflowRunnerService: WorkflowRunnerService,
private readonly workflowStatusService: WorkflowStatusService,
private readonly workflowStatusWorkspaceService: WorkflowStatusWorkspaceService,
) {}

@Process(WorkflowRunnerJob.name)
async handle({
workspaceId,
workflowVersionId,
workflowRunId,
payload,
}: RunWorkflowJobData): Promise<void> {
await this.workflowStatusService.startWorkflowRun(
workspaceId,
workflowVersionId,
);
await this.workflowStatusWorkspaceService.startWorkflowRun(workflowRunId);

const workflowVersion = await this.workflowCommonService.getWorkflowVersion(
workspaceId,
Expand All @@ -42,9 +43,8 @@ export class WorkflowRunnerJob {
payload,
});

await this.workflowStatusService.endWorkflowRun(
workspaceId,
workflowVersionId,
await this.workflowStatusWorkspaceService.endWorkflowRun(
workflowRunId,
output.error ? WorkflowRunStatus.FAILED : WorkflowRunStatus.COMPLETED,
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ export class WorkflowStatusException extends CustomException {

export enum WorkflowStatusExceptionCode {
WORKFLOW_RUN_NOT_FOUND = 'WORKFLOW_RUN_NOT_FOUND',
WORKFLOW_RUN_ALREADY_STARTED = 'WORKFLOW_RUN_ALREADY_STARTED',
INVALID_OPERATION = 'INVALID_OPERATION',
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ import { Module } from '@nestjs/common';

import { TwentyORMModule } from 'src/engine/twenty-orm/twenty-orm.module';
import { WorkflowRunWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import { WorkflowStatusService } from 'src/modules/workflow/workflow-status/workflow-status.service';
import { WorkflowStatusWorkspaceService } from 'src/modules/workflow/workflow-status/workflow-status.workspace-service';

@Module({
imports: [TwentyORMModule.forFeature([WorkflowRunWorkspaceEntity])],
providers: [WorkflowStatusService],
exports: [WorkflowStatusService],
providers: [WorkflowStatusWorkspaceService],
exports: [WorkflowStatusWorkspaceService],
})
export class WorkflowStatusModule {}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { Injectable } from '@nestjs/common';

import { ActorMetadata } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import {
WorkflowRunStatus,
WorkflowRunWorkspaceEntity,
} from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import {
WorkflowStatusException,
WorkflowStatusExceptionCode,
} from 'src/modules/workflow/workflow-status/workflow-status.exception';

@Injectable()
export class WorkflowStatusWorkspaceService {
constructor(private readonly twentyORMManager: TwentyORMManager) {}

async createWorkflowRun(workflowVersionId: string, createdBy: ActorMetadata) {
const workflowRunRepository =
await this.twentyORMManager.getRepository<WorkflowRunWorkspaceEntity>(
'workflowRun',
);

return (
await workflowRunRepository.save({
workflowVersionId,
createdBy,
status: WorkflowRunStatus.NOT_STARTED,
})
).id;
}

async startWorkflowRun(workflowRunId: string) {
const workflowRunRepository =
await this.twentyORMManager.getRepository<WorkflowRunWorkspaceEntity>(
'workflowRun',
);

const workflowRunToUpdate = await workflowRunRepository.findOneBy({
id: workflowRunId,
});

if (!workflowRunToUpdate) {
throw new WorkflowStatusException(
'No workflow run to start',
WorkflowStatusExceptionCode.WORKFLOW_RUN_NOT_FOUND,
);
}

if (workflowRunToUpdate.status !== WorkflowRunStatus.NOT_STARTED) {
throw new WorkflowStatusException(
'Workflow run already started',
WorkflowStatusExceptionCode.INVALID_OPERATION,
);
}

return workflowRunRepository.update(workflowRunToUpdate.id, {
status: WorkflowRunStatus.RUNNING,
startedAt: new Date().toISOString(),
});
}

async endWorkflowRun(workflowRunId: string, status: WorkflowRunStatus) {
const workflowRunRepository =
await this.twentyORMManager.getRepository<WorkflowRunWorkspaceEntity>(
'workflowRun',
);

const workflowRunToUpdate = await workflowRunRepository.findOneBy({
id: workflowRunId,
});

if (!workflowRunToUpdate) {
throw new WorkflowStatusException(
'No workflow run to end',
WorkflowStatusExceptionCode.WORKFLOW_RUN_NOT_FOUND,
);
}

if (workflowRunToUpdate.status !== WorkflowRunStatus.RUNNING) {
throw new WorkflowStatusException(
'Workflow cannot be ended as it is not running',
WorkflowStatusExceptionCode.INVALID_OPERATION,
);
}

return workflowRunRepository.update(workflowRunToUpdate.id, {
status,
endedAt: new Date().toISOString(),
});
}
}
Original file line number Diff line number Diff line change
@@ -1,33 +1,34 @@
import { Logger } from '@nestjs/common';
import { Logger, Scope } from '@nestjs/common';

import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { FieldActorSource } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity';
import {
RunWorkflowJobData,
WorkflowRunnerJob,
} from 'src/modules/workflow/workflow-runner/workflow-runner.job';
import { WorkflowStatusService } from 'src/modules/workflow/workflow-status/workflow-status.service';
import { WorkflowStatusWorkspaceService } from 'src/modules/workflow/workflow-status/workflow-status.workspace-service';

export type WorkflowEventTriggerJobData = {
workspaceId: string;
workflowId: string;
payload: object;
};

@Processor(MessageQueue.workflowQueue)
@Processor({ queueName: MessageQueue.workflowQueue, scope: Scope.REQUEST })
export class WorkflowEventTriggerJob {
private readonly logger = new Logger(WorkflowEventTriggerJob.name);

constructor(
@InjectMessageQueue(MessageQueue.workflowQueue)
private readonly messageQueueService: MessageQueueService,
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
private readonly workflowStatusService: WorkflowStatusService,
private readonly workflowStatusWorkspaceService: WorkflowStatusWorkspaceService,
) {}

@Process(WorkflowEventTriggerJob.name)
Expand All @@ -46,15 +47,20 @@ export class WorkflowEventTriggerJob {
throw new Error('Workflow has no published version');
}

await this.workflowStatusService.createWorkflowRun(
data.workspaceId,
workflow.publishedVersionId,
);
const workflowRunId =
await this.workflowStatusWorkspaceService.createWorkflowRun(
workflow.publishedVersionId,
{
source: FieldActorSource.WORKFLOW,
name: workflow.name,
},
);

this.messageQueueService.add<RunWorkflowJobData>(WorkflowRunnerJob.name, {
workspaceId: data.workspaceId,
workflowVersionId: workflow.publishedVersionId,
payload: data.payload,
workflowRunId,
});
}
}

0 comments on commit a2fd157

Please sign in to comment.