From 0b16d20c78c4ce865e4f8f045e89024c326f0c07 Mon Sep 17 00:00:00 2001 From: Thomas Trompette Date: Thu, 29 Aug 2024 13:16:30 +0200 Subject: [PATCH 1/4] Add listener to keep statuses up to date --- .../workflow-version.workspace-entity.ts | 11 +- .../workflow.workspace-entity.ts | 32 +- .../workflow-statuses-update.job.spec.ts | 307 ++++++++++++++++++ .../jobs/workflow-statuses-update.job.ts | 256 +++++++++++++++ .../workflow-version-status.listener.ts | 85 +++++ 5 files changed, 686 insertions(+), 5 deletions(-) create mode 100644 packages/twenty-server/src/modules/workflow/workflow-status/jobs/__tests__/workflow-statuses-update.job.spec.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-status/listeners/workflow-version-status.listener.ts diff --git a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-version.workspace-entity.ts b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-version.workspace-entity.ts index 5324aaf6323f..8149cfcc39f4 100644 --- a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-version.workspace-entity.ts +++ b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-version.workspace-entity.ts @@ -28,9 +28,10 @@ export enum WorkflowVersionStatus { DRAFT = 'DRAFT', ACTIVE = 'ACTIVE', DEACTIVATED = 'DEACTIVATED', + ARCHIVED = 'ARCHIVED', } -export const WorkflowVersionStatusOptions = [ +const WorkflowVersionStatusOptions = [ { value: WorkflowVersionStatus.DRAFT, label: 'Draft', @@ -47,7 +48,13 @@ export const WorkflowVersionStatusOptions = [ value: WorkflowVersionStatus.DEACTIVATED, label: 'Deactivated', position: 2, - color: 'gray', + color: 'red', + }, + { + value: WorkflowVersionStatus.ARCHIVED, + label: 'Archived', + position: 3, + color: 'grey', }, ]; diff --git a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow.workspace-entity.ts b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow.workspace-entity.ts index 455a7e02ba38..ef2e4da94963 100644 --- a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow.workspace-entity.ts +++ b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow.workspace-entity.ts @@ -20,10 +20,36 @@ import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/commo import { WorkflowRunWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; import { WorkflowVersionStatus, - WorkflowVersionStatusOptions, WorkflowVersionWorkspaceEntity, } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; +export enum WorkflowStatus { + DRAFT = 'DRAFT', + ACTIVE = 'ACTIVE', + DEACTIVATED = 'DEACTIVATED', +} + +const WorkflowStatusOptions = [ + { + value: WorkflowVersionStatus.DRAFT, + label: 'Draft', + position: 0, + color: 'yellow', + }, + { + value: WorkflowVersionStatus.ACTIVE, + label: 'Active', + position: 1, + color: 'green', + }, + { + value: WorkflowVersionStatus.DEACTIVATED, + label: 'Deactivated', + position: 2, + color: 'grey', + }, +]; + @WorkspaceEntity({ standardId: STANDARD_OBJECT_IDS.workflow, namePlural: 'workflows', @@ -61,10 +87,10 @@ export class WorkflowWorkspaceEntity extends BaseWorkspaceEntity { type: FieldMetadataType.MULTI_SELECT, label: 'Statuses', description: 'The current statuses of the workflow versions', - options: WorkflowVersionStatusOptions, + options: WorkflowStatusOptions, }) @WorkspaceIsNullable() - statuses: WorkflowVersionStatus[] | null; + statuses: WorkflowStatus[] | null; @WorkspaceField({ standardId: WORKFLOW_STANDARD_FIELD_IDS.position, diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/jobs/__tests__/workflow-statuses-update.job.spec.ts b/packages/twenty-server/src/modules/workflow/workflow-status/jobs/__tests__/workflow-statuses-update.job.spec.ts new file mode 100644 index 000000000000..8f5bceff0d29 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-status/jobs/__tests__/workflow-statuses-update.job.spec.ts @@ -0,0 +1,307 @@ +import { Test, TestingModule } from '@nestjs/testing'; + +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { WorkflowVersionStatus } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; +import { WorkflowStatus } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; +import { + WorkflowStatusesUpdateJob, + WorkflowVersionEvent, + WorkflowVersionEventType, +} from 'src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job'; + +describe('WorkflowStatusesUpdate', () => { + let job: WorkflowStatusesUpdateJob; + + const mockWorkflowVersionRepository = { + find: jest.fn(), + }; + + const mockWorkflowRepository = { + findOneOrFail: jest.fn(), + update: jest.fn(), + }; + + const mockTwentyORMManager = { + getRepository: jest + .fn() + .mockImplementation((name) => + name === 'workflow' + ? mockWorkflowRepository + : mockWorkflowVersionRepository, + ), + }; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [ + WorkflowStatusesUpdateJob, + { + provide: TwentyORMManager, + useValue: mockTwentyORMManager, + }, + ], + }).compile(); + + job = await module.resolve( + WorkflowStatusesUpdateJob, + ); + }); + + it('should be defined', () => { + expect(job).toBeDefined(); + }); + + describe('handle', () => { + beforeEach(() => { + // make twentyORMManager.getRepository return a mock object + TwentyORMManager.prototype.getRepository = jest.fn(); + }); + + describe('when event type is CREATE', () => { + it('when already a draft, do not change anything', async () => { + // Arrange + const event: WorkflowVersionEvent = { + workspaceId: '1', + type: WorkflowVersionEventType.CREATE, + workflowId: '1', + }; + + const mockWorkflow = { + statuses: [WorkflowStatus.DRAFT], + }; + + mockWorkflowRepository.findOneOrFail.mockResolvedValue(mockWorkflow); + + // Act + await job.handle(event); + + // Assert + expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1); + expect(mockWorkflowRepository.update).toHaveBeenCalledTimes(0); + }); + + it('when no draft yet, update statuses', async () => { + // Arrange + const event: WorkflowVersionEvent = { + workspaceId: '1', + type: WorkflowVersionEventType.CREATE, + workflowId: '1', + }; + + const mockWorkflow = { + statuses: [WorkflowStatus.ACTIVE], + }; + + mockWorkflowRepository.findOneOrFail.mockResolvedValue(mockWorkflow); + + // Act + await job.handle(event); + + // Assert + expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1); + expect(mockWorkflowRepository.update).toHaveBeenCalledTimes(1); + }); + }); + + describe('when event type is STATUS_UPDATE', () => { + test('when status is the same, should not do anything', async () => { + // Arrange + const event: WorkflowVersionEvent = { + workspaceId: '1', + type: WorkflowVersionEventType.STATUS_UPDATE, + workflowId: '1', + previousStatus: WorkflowVersionStatus.ACTIVE, + newStatus: WorkflowVersionStatus.ACTIVE, + }; + + const mockWorkflow = { + statuses: [WorkflowStatus.ACTIVE], + }; + + mockWorkflowRepository.findOneOrFail.mockResolvedValue(mockWorkflow); + + // Act + await job.handle(event); + + // Assert + expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1); + expect(mockWorkflowRepository.update).toHaveBeenCalledTimes(0); + }); + + test('when update that should be impossible, do not do anything', async () => { + // Arrange + const event: WorkflowVersionEvent = { + workspaceId: '1', + type: WorkflowVersionEventType.STATUS_UPDATE, + workflowId: '1', + previousStatus: WorkflowVersionStatus.ACTIVE, + newStatus: WorkflowVersionStatus.DRAFT, + }; + + const mockWorkflow = { + statuses: [WorkflowStatus.ACTIVE], + }; + + mockWorkflowRepository.findOneOrFail.mockResolvedValue(mockWorkflow); + + // Act + await job.handle(event); + + // Assert + expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1); + expect(mockWorkflowRepository.update).toHaveBeenCalledTimes(0); + }); + + test('when WorkflowVersionStatus.DEACTIVATED to WorkflowVersionStatus.ACTIVE, should activate', async () => { + // Arrange + const event: WorkflowVersionEvent = { + workspaceId: '1', + type: WorkflowVersionEventType.STATUS_UPDATE, + workflowId: '1', + previousStatus: WorkflowVersionStatus.DEACTIVATED, + newStatus: WorkflowVersionStatus.ACTIVE, + }; + + const mockWorkflow = { + statuses: [WorkflowStatus.DEACTIVATED], + }; + + mockWorkflowRepository.findOneOrFail.mockResolvedValue(mockWorkflow); + + // Act + await job.handle(event); + + // Assert + expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1); + expect(mockWorkflowRepository.update).toHaveBeenCalledWith( + { id: '1' }, + { statuses: [WorkflowStatus.ACTIVE] }, + ); + }); + + test('when WorkflowVersionStatus.ACTIVE to WorkflowVersionStatus.DEACTIVATED, should deactivate', async () => { + // Arrange + const event: WorkflowVersionEvent = { + workspaceId: '1', + type: WorkflowVersionEventType.STATUS_UPDATE, + workflowId: '1', + previousStatus: WorkflowVersionStatus.ACTIVE, + newStatus: WorkflowVersionStatus.DEACTIVATED, + }; + + const mockWorkflow = { + statuses: [WorkflowStatus.ACTIVE, WorkflowStatus.DRAFT], + }; + + mockWorkflowRepository.findOneOrFail.mockResolvedValue(mockWorkflow); + mockWorkflowVersionRepository.find.mockResolvedValue([]); + + // Act + await job.handle(event); + + // Assert + expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1); + expect(mockWorkflowVersionRepository.find).toHaveBeenCalledWith({ + where: { + workflowId: '1', + status: WorkflowVersionStatus.ACTIVE, + }, + }); + expect(mockWorkflowRepository.update).toHaveBeenCalledWith( + { id: '1' }, + { statuses: [WorkflowStatus.DEACTIVATED, WorkflowStatus.DRAFT] }, + ); + }); + + test('when WorkflowVersionStatus.DRAFT to WorkflowVersionStatus.ACTIVE, should activate', async () => { + // Arrange + const event: WorkflowVersionEvent = { + workspaceId: '1', + type: WorkflowVersionEventType.STATUS_UPDATE, + workflowId: '1', + previousStatus: WorkflowVersionStatus.DRAFT, + newStatus: WorkflowVersionStatus.ACTIVE, + }; + + const mockWorkflow = { + statuses: [WorkflowStatus.DRAFT], + }; + + mockWorkflowRepository.findOneOrFail.mockResolvedValue(mockWorkflow); + mockWorkflowVersionRepository.find.mockResolvedValue([]); + + // Act + await job.handle(event); + + // Assert + expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1); + expect(mockWorkflowVersionRepository.find).toHaveBeenCalledWith({ + where: { + workflowId: '1', + status: WorkflowVersionStatus.DRAFT, + }, + }); + expect(mockWorkflowRepository.update).toHaveBeenCalledWith( + { id: '1' }, + { statuses: [WorkflowStatus.ACTIVE] }, + ); + }); + }); + + describe('when event type is DELETE', () => { + test('when status is not draft, should not do anything', async () => { + // Arrange + const event: WorkflowVersionEvent = { + workspaceId: '1', + type: WorkflowVersionEventType.DELETE, + workflowId: '1', + }; + + const mockWorkflow = { + statuses: [WorkflowStatus.ACTIVE], + }; + + mockWorkflowRepository.findOneOrFail.mockResolvedValue(mockWorkflow); + + // Act + await job.handle(event); + + // Assert + expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1); + expect(mockWorkflowRepository.update).toHaveBeenCalledTimes(0); + }); + + test('when status is draft, should delete', async () => { + // Arrange + const event: WorkflowVersionEvent = { + workspaceId: '1', + type: WorkflowVersionEventType.DELETE, + workflowId: '1', + }; + + const mockWorkflow = { + statuses: [WorkflowStatus.DRAFT], + }; + + mockWorkflowRepository.findOneOrFail.mockResolvedValue(mockWorkflow); + mockWorkflowVersionRepository.find.mockResolvedValue([]); + + // Act + await job.handle(event); + + // Assert + expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1); + expect(mockWorkflowVersionRepository.find).toHaveBeenCalledWith({ + where: { + workflowId: '1', + status: WorkflowVersionStatus.DRAFT, + }, + }); + expect(mockWorkflowRepository.update).toHaveBeenCalledWith( + { id: '1' }, + { statuses: [] }, + ); + }); + }); + }); +}); diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job.ts b/packages/twenty-server/src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job.ts new file mode 100644 index 000000000000..fcfbd463cfd9 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job.ts @@ -0,0 +1,256 @@ +import { Scope } from '@nestjs/common'; + +import _ from 'lodash'; + +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { + WorkflowVersionStatus, + WorkflowVersionWorkspaceEntity, +} from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; +import { + WorkflowStatus, + WorkflowWorkspaceEntity, +} from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; + +export enum WorkflowVersionEventType { + CREATE = 'CREATE', + STATUS_UPDATE = 'STATUS_UPDATE', + DELETE = 'DELETE', +} + +export type WorkflowVersionEvent = { workspaceId: string } & ( + | WorkflowVersionCreated + | WorkflowVersionStatusUpdate + | WorkflowVersionDeleted +); + +export type WorkflowVersionCreated = { + type: WorkflowVersionEventType.CREATE; + workflowId: string; +}; + +export type WorkflowVersionStatusUpdate = { + type: WorkflowVersionEventType.STATUS_UPDATE; + workflowId: string; + previousStatus: WorkflowVersionStatus; + newStatus: WorkflowVersionStatus; +}; + +export type WorkflowVersionDeleted = { + type: WorkflowVersionEventType.DELETE; + workflowId: string; +}; + +@Processor({ queueName: MessageQueue.workflowQueue, scope: Scope.REQUEST }) +export class WorkflowStatusesUpdateJob { + constructor(private readonly twentyORMManager: TwentyORMManager) {} + + async handle(event: WorkflowVersionEvent): Promise { + switch (event.type) { + case WorkflowVersionEventType.CREATE: + return this.handleWorkflowVersionCreated(event); + case WorkflowVersionEventType.STATUS_UPDATE: + return this.handleWorkflowVersionStatusUpdated(event); + case WorkflowVersionEventType.DELETE: + return this.handleWorkflowVersionDeleted(event); + } + } + + private async handleWorkflowVersionCreated( + event: WorkflowVersionCreated, + ): Promise { + const workflowRepository = + await this.twentyORMManager.getRepository( + 'workflow', + ); + + const workflow = await workflowRepository.findOneOrFail({ + where: { + id: event.workflowId, + }, + }); + + const currentWorkflowStatuses = workflow.statuses || []; + + if (currentWorkflowStatuses.includes(WorkflowStatus.DRAFT)) { + return; + } + + await workflowRepository.update( + { + id: workflow.id, + }, + { + statuses: [...currentWorkflowStatuses, WorkflowStatus.DRAFT], + }, + ); + } + + private async handleWorkflowVersionStatusUpdated( + event: WorkflowVersionStatusUpdate, + ): Promise { + const workflowRepository = + await this.twentyORMManager.getRepository( + 'workflow', + ); + + const workflow = await workflowRepository.findOneOrFail({ + where: { + id: event.workflowId, + }, + }); + + const currentWorkflowStatuses = workflow.statuses || []; + let newWorkflowStatuses = new Set(currentWorkflowStatuses); + + if ( + event.previousStatus === WorkflowVersionStatus.DEACTIVATED && + event.newStatus === WorkflowVersionStatus.ACTIVE + ) { + newWorkflowStatuses = await this.buildStatusesFromNewActivation( + currentWorkflowStatuses, + ); + } + + if ( + event.previousStatus === WorkflowVersionStatus.ACTIVE && + event.newStatus === WorkflowVersionStatus.DEACTIVATED + ) { + newWorkflowStatuses = await this.buildStatusesFromDeactivation( + event.workflowId, + currentWorkflowStatuses, + ); + } + + if ( + event.previousStatus === WorkflowVersionStatus.DRAFT && + event.newStatus === WorkflowVersionStatus.ACTIVE + ) { + newWorkflowStatuses = await this.buildStatusesFromFirstActivation( + event.workflowId, + ); + } + + const newWorkflowStatusesArray = Array.from(newWorkflowStatuses); + + if ( + _.isEqual(newWorkflowStatusesArray.sort(), currentWorkflowStatuses.sort()) + ) { + return; + } + + await workflowRepository.update( + { + id: event.workflowId, + }, + { + statuses: newWorkflowStatusesArray, + }, + ); + } + + private async handleWorkflowVersionDeleted( + event: WorkflowVersionDeleted, + ): Promise { + const workflowRepository = + await this.twentyORMManager.getRepository( + 'workflow', + ); + + const workflow = await workflowRepository.findOneOrFail({ + where: { + id: event.workflowId, + }, + }); + + const currentWorkflowStatuses = workflow.statuses || []; + + if (!currentWorkflowStatuses.includes(WorkflowStatus.DRAFT)) { + return; + } + + const hasWorkflowVersionByStatus = await this.hasWorkflowVersionByStatus( + event.workflowId, + WorkflowVersionStatus.DRAFT, + ); + + if (hasWorkflowVersionByStatus) { + return; + } + + await workflowRepository.update( + { + id: event.workflowId, + }, + { + statuses: currentWorkflowStatuses.filter( + (status) => status !== WorkflowStatus.DRAFT, + ), + }, + ); + } + + private async buildStatusesFromFirstActivation(workflowId: string) { + const hasWorkflowVersionDraft = await this.hasWorkflowVersionByStatus( + workflowId, + WorkflowVersionStatus.DRAFT, + ); + + if (hasWorkflowVersionDraft) { + return new Set([WorkflowStatus.ACTIVE, WorkflowStatus.DRAFT]); + } + + return new Set([WorkflowStatus.ACTIVE]); + } + + private async buildStatusesFromDeactivation( + workflowId: string, + currentWorkflowStatuses: WorkflowStatus[], + ) { + const hasWorkflowVersionActive = await this.hasWorkflowVersionByStatus( + workflowId, + WorkflowVersionStatus.ACTIVE, + ); + + if (hasWorkflowVersionActive) { + return new Set(currentWorkflowStatuses); + } + + return new Set( + currentWorkflowStatuses + .filter((status) => status !== WorkflowStatus.ACTIVE) + .concat(WorkflowStatus.DEACTIVATED), + ); + } + + private async buildStatusesFromNewActivation( + currentWorkflowStatuses: WorkflowStatus[], + ) { + return new Set( + currentWorkflowStatuses + .filter((status) => status !== WorkflowStatus.DEACTIVATED) + .concat(WorkflowStatus.ACTIVE), + ); + } + + private async hasWorkflowVersionByStatus( + workflowId: string, + status: WorkflowVersionStatus, + ): Promise { + const workflowVersionRepository = + await this.twentyORMManager.getRepository( + 'workflowVersion', + ); + + const workflowVersions = await workflowVersionRepository.find({ + where: { + workflowId, + status, + }, + }); + + return workflowVersions.length > 0; + } +} diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/listeners/workflow-version-status.listener.ts b/packages/twenty-server/src/modules/workflow/workflow-status/listeners/workflow-version-status.listener.ts new file mode 100644 index 000000000000..4ffd249427a7 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-status/listeners/workflow-version-status.listener.ts @@ -0,0 +1,85 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { OnEvent } from '@nestjs/event-emitter'; + +import { ObjectRecordCreateEvent } from 'src/engine/integrations/event-emitter/types/object-record-create.event'; +import { ObjectRecordUpdateEvent } from 'src/engine/integrations/event-emitter/types/object-record-update.event'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { + WorkflowVersionStatus, + WorkflowVersionWorkspaceEntity, +} from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; +import { + WorkflowStatusesUpdateJob, + WorkflowVersionEvent, + WorkflowVersionEventType, +} from 'src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job'; + +@Injectable() +export class WorkflowVersionStatusListener { + private readonly logger = new Logger(WorkflowVersionStatusListener.name); + + constructor( + @InjectMessageQueue(MessageQueue.workflowQueue) + private readonly messageQueueService: MessageQueueService, + ) {} + + @OnEvent('workflowVersion.created') + async handleWorkflowVersionCreated( + event: ObjectRecordCreateEvent, + ): Promise { + await this.messageQueueService.add( + WorkflowStatusesUpdateJob.name, + { + type: WorkflowVersionEventType.CREATE, + workflowId: event.properties.after.workflowId, + }, + ); + } + + @OnEvent('workflowVersion.updated') + async handleWorkflowVersionUpdated( + event: ObjectRecordUpdateEvent, + ): Promise { + const workflowVersionBefore = event.properties.before; + const workflowVersionAfter = event.properties.after; + + if (workflowVersionBefore.status === workflowVersionAfter.status) { + return; + } + + await this.messageQueueService.add( + WorkflowStatusesUpdateJob.name, + { + type: WorkflowVersionEventType.STATUS_UPDATE, + workflowId: workflowVersionAfter.workflowId, + previousStatus: workflowVersionBefore.status, + newStatus: workflowVersionAfter.status, + }, + ); + } + + @OnEvent('workflowVersion.deleted') + async handleWorkflowVersionDeleted( + event: ObjectRecordCreateEvent, + ): Promise { + const workflowVersionDeleted = event.properties.after; + + if (workflowVersionDeleted.status !== WorkflowVersionStatus.DRAFT) { + this.logger.warn( + `Workflow version ${event.recordId} with status ${workflowVersionDeleted.status} was deleted.`, + ); + + return; + } + + await this.messageQueueService.add( + WorkflowStatusesUpdateJob.name, + { + type: WorkflowVersionEventType.DELETE, + workflowId: workflowVersionDeleted.workflowId, + }, + ); + } +} From ccf196f66c3591de13e25423b3af98da43004922 Mon Sep 17 00:00:00 2001 From: Thomas Trompette Date: Thu, 29 Aug 2024 15:45:42 +0200 Subject: [PATCH 2/4] Split into functions + rename endpoints --- .../workflow/workflow-trigger.resolver.ts | 8 +- .../workflow-common.workspace-service.ts | 10 + .../jobs/workflow-statuses-update.job.ts | 4 +- .../workflow-trigger.workspace-service.ts | 236 ++++++++++++------ 4 files changed, 179 insertions(+), 79 deletions(-) diff --git a/packages/twenty-server/src/engine/core-modules/workflow/workflow-trigger.resolver.ts b/packages/twenty-server/src/engine/core-modules/workflow/workflow-trigger.resolver.ts index 82aa30498f0a..630832a18edc 100644 --- a/packages/twenty-server/src/engine/core-modules/workflow/workflow-trigger.resolver.ts +++ b/packages/twenty-server/src/engine/core-modules/workflow/workflow-trigger.resolver.ts @@ -19,19 +19,19 @@ export class WorkflowTriggerResolver { ) {} @Mutation(() => Boolean) - async enableWorkflowTrigger( + async activateWorkflowVersion( @Args('workflowVersionId') workflowVersionId: string, ) { - return await this.workflowTriggerWorkspaceService.enableWorkflowTrigger( + return await this.workflowTriggerWorkspaceService.activateWorkflowVersion( workflowVersionId, ); } @Mutation(() => Boolean) - async disableWorkflowTrigger( + async deactivateWorkflowVersion( @Args('workflowVersionId') workflowVersionId: string, ) { - return await this.workflowTriggerWorkspaceService.disableWorkflowTrigger( + return await this.workflowTriggerWorkspaceService.deactivateWorkflowVersion( workflowVersionId, ); } diff --git a/packages/twenty-server/src/modules/workflow/common/workflow-common.workspace-service.ts b/packages/twenty-server/src/modules/workflow/common/workflow-common.workspace-service.ts index cfb44ea261cc..9d325f01bceb 100644 --- a/packages/twenty-server/src/modules/workflow/common/workflow-common.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/common/workflow-common.workspace-service.ts @@ -28,6 +28,16 @@ export class WorkflowCommonWorkspaceService { }, }); + return this.getValidWorkflowVersionOrFail(workflowVersion); + } + + async getValidWorkflowVersionOrFail( + workflowVersion: WorkflowVersionWorkspaceEntity | null, + ): Promise< + Omit & { + trigger: WorkflowTrigger; + } + > { if (!workflowVersion) { throw new WorkflowTriggerException( 'Workflow version not found', diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job.ts b/packages/twenty-server/src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job.ts index fcfbd463cfd9..4a65263ae8f9 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job.ts @@ -1,6 +1,6 @@ import { Scope } from '@nestjs/common'; -import _ from 'lodash'; +import { isEqual } from 'lodash'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; @@ -136,7 +136,7 @@ export class WorkflowStatusesUpdateJob { const newWorkflowStatusesArray = Array.from(newWorkflowStatuses); if ( - _.isEqual(newWorkflowStatusesArray.sort(), currentWorkflowStatuses.sort()) + isEqual(newWorkflowStatusesArray.sort(), currentWorkflowStatuses.sort()) ) { return; } diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.workspace-service.ts index 6e225bdcde5e..d9a75ca8b859 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.workspace-service.ts @@ -5,13 +5,17 @@ import { EntityManager } from 'typeorm'; import { buildCreatedByFromWorkspaceMember } from 'src/engine/core-modules/actor/utils/build-created-by-from-workspace-member.util'; import { User } from 'src/engine/core-modules/user/user.entity'; import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; +import { WorkspaceRepository } from 'src/engine/twenty-orm/repository/workspace.repository'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { WorkflowVersionStatus, WorkflowVersionWorkspaceEntity, } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; -import { WorkflowTriggerType } from 'src/modules/workflow/common/types/workflow-trigger.type'; +import { + WorkflowTrigger, + WorkflowTriggerType, +} from 'src/modules/workflow/common/types/workflow-trigger.type'; import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workflow-common.workspace-service'; import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-runner.workspace-service'; import { DatabaseEventTriggerService } from 'src/modules/workflow/workflow-trigger/database-event-trigger/database-event-trigger.service'; @@ -58,10 +62,19 @@ export class WorkflowTriggerWorkspaceService { ); } - async enableWorkflowTrigger(workflowVersionId: string) { + async activateWorkflowVersion(workflowVersionId: string) { + const workflowVersionRepository = + await this.twentyORMManager.getRepository( + 'workflowVersion', + ); + + const workflowVersionNullable = await workflowVersionRepository.findOne({ + where: { id: workflowVersionId }, + }); + const workflowVersion = - await this.workflowCommonWorkspaceService.getWorkflowVersionOrFail( - workflowVersionId, + await this.workflowCommonWorkspaceService.getValidWorkflowVersionOrFail( + workflowVersionNullable, ); const workflowRepository = @@ -91,38 +104,13 @@ export class WorkflowTriggerWorkspaceService { const manager = queryRunner.manager; try { - if ( - workflow.lastPublishedVersionId && - workflowVersionId !== workflow.lastPublishedVersionId - ) { - await this.disableWorkflowTriggerWithManager( - workflow.lastPublishedVersionId, - manager, - ); - } - - await this.activateWorkflowVersion( - workflowVersion.workflowId, - workflowVersionId, + await this.performActivationSteps( + workflow, + workflowVersion, + workflowRepository, + workflowVersionRepository, manager, ); - await workflowRepository.update( - { id: workflow.id }, - { lastPublishedVersionId: workflowVersionId }, - manager, - ); - - switch (workflowVersion.trigger.type) { - case WorkflowTriggerType.DATABASE_EVENT: - await this.databaseEventTriggerService.createEventListener( - workflowVersion.workflowId, - workflowVersion.trigger, - manager, - ); - break; - default: - break; - } await queryRunner.commitTransaction(); @@ -135,7 +123,7 @@ export class WorkflowTriggerWorkspaceService { } } - async disableWorkflowTrigger(workflowVersionId: string) { + async deactivateWorkflowVersion(workflowVersionId: string) { const workspaceDataSource = await this.twentyORMManager.getDatasource(); const queryRunner = workspaceDataSource.createQueryRunner(); @@ -143,8 +131,14 @@ export class WorkflowTriggerWorkspaceService { await queryRunner.startTransaction(); try { - await this.disableWorkflowTriggerWithManager( + const workflowVersionRepository = + await this.twentyORMManager.getRepository( + 'workflowVersion', + ); + + await this.performDeactivationSteps( workflowVersionId, + workflowVersionRepository, queryRunner.manager, ); @@ -159,61 +153,73 @@ export class WorkflowTriggerWorkspaceService { } } - private async disableWorkflowTriggerWithManager( - workflowVersionId: string, + private async performActivationSteps( + workflow: WorkflowWorkspaceEntity, + workflowVersion: Omit & { + trigger: WorkflowTrigger; + }, + workflowRepository: WorkspaceRepository, + workflowVersionRepository: WorkspaceRepository, manager: EntityManager, ) { - const workflowVersionRepository = - await this.twentyORMManager.getRepository( - 'workflowVersion', + if ( + workflow.lastPublishedVersionId && + workflowVersion.id !== workflow.lastPublishedVersionId + ) { + await this.performDeactivationSteps( + workflow.lastPublishedVersionId, + workflowVersionRepository, + manager, ); - const workflowVersion = await workflowVersionRepository.findOne({ - where: { id: workflowVersionId }, - }); - - if (!workflowVersion) { - throw new WorkflowTriggerException( - 'No workflow version found', - WorkflowTriggerExceptionCode.INVALID_INPUT, + await this.upgradeWorflowVersion( + workflow, + workflowVersion.id, + workflowRepository, + workflowVersionRepository, + manager, ); } - if (workflowVersion.status !== WorkflowVersionStatus.ACTIVE) { - throw new WorkflowTriggerException( - 'Cannot disable non-active workflow version', - WorkflowTriggerExceptionCode.INVALID_INPUT, + await this.setActiveVersionStatus( + workflowVersion.workflowId, + workflowVersion.id, + workflowVersionRepository, + manager, + ); + + await this.enableTrigger(workflowVersion, manager); + } + + private async performDeactivationSteps( + workflowVersionId: string, + workflowVersionRepository: WorkspaceRepository, + manager: EntityManager, + ) { + const workflowVersionNullable = await workflowVersionRepository.findOne({ + where: { id: workflowVersionId }, + }); + + const workflowVersion = + await this.workflowCommonWorkspaceService.getValidWorkflowVersionOrFail( + workflowVersionNullable, ); - } - await workflowVersionRepository.update( - { id: workflowVersionId }, - { status: WorkflowVersionStatus.DEACTIVATED }, + await this.setDeactivatedVersionStatus( + workflowVersion, + workflowVersionRepository, manager, ); - switch (workflowVersion?.trigger?.type) { - case WorkflowTriggerType.DATABASE_EVENT: - await this.databaseEventTriggerService.deleteEventListener( - workflowVersion.workflowId, - manager, - ); - break; - default: - break; - } + await this.disableTrigger(workflowVersion, manager); } - private async activateWorkflowVersion( + private async setActiveVersionStatus( workflowId: string, workflowVersionId: string, + workflowVersionRepository: WorkspaceRepository, manager: EntityManager, ) { - const workflowVersionRepository = - await this.twentyORMManager.getRepository( - 'workflowVersion', - ); - const activeWorkflowVersions = await workflowVersionRepository.find( { where: { workflowId, status: WorkflowVersionStatus.ACTIVE }, @@ -234,4 +240,88 @@ export class WorkflowTriggerWorkspaceService { manager, ); } + + private async setDeactivatedVersionStatus( + workflowVersion: Omit & { + trigger: WorkflowTrigger; + }, + workflowVersionRepository: WorkspaceRepository, + manager: EntityManager, + ) { + if (workflowVersion.status !== WorkflowVersionStatus.ACTIVE) { + throw new WorkflowTriggerException( + 'Cannot disable non-active workflow version', + WorkflowTriggerExceptionCode.FORBIDDEN, + ); + } + + await workflowVersionRepository.update( + { id: workflowVersion.id }, + { status: WorkflowVersionStatus.DEACTIVATED }, + manager, + ); + } + + private async upgradeWorflowVersion( + workflow: WorkflowWorkspaceEntity, + newPublishedVersionId: string, + workflowRepository: WorkspaceRepository, + workflowVersionRepository: WorkspaceRepository, + manager: EntityManager, + ) { + if (workflow.lastPublishedVersionId === newPublishedVersionId) { + return; + } + + if (workflow.lastPublishedVersionId) { + await workflowVersionRepository.update( + { id: workflow.lastPublishedVersionId }, + { status: WorkflowVersionStatus.ARCHIVED }, + manager, + ); + } + + await workflowRepository.update( + { id: workflow.id }, + { lastPublishedVersionId: newPublishedVersionId }, + manager, + ); + } + + private async enableTrigger( + workflowVersion: Omit & { + trigger: WorkflowTrigger; + }, + manager: EntityManager, + ) { + switch (workflowVersion.trigger.type) { + case WorkflowTriggerType.DATABASE_EVENT: + await this.databaseEventTriggerService.createEventListener( + workflowVersion.workflowId, + workflowVersion.trigger, + manager, + ); + break; + default: + break; + } + } + + private async disableTrigger( + workflowVersion: Omit & { + trigger: WorkflowTrigger; + }, + manager: EntityManager, + ) { + switch (workflowVersion.trigger.type) { + case WorkflowTriggerType.DATABASE_EVENT: + await this.databaseEventTriggerService.deleteEventListener( + workflowVersion.workflowId, + manager, + ); + break; + default: + break; + } + } } From fcda7569b0ad22c4de5c4b1abceb9909db65cae3 Mon Sep 17 00:00:00 2001 From: Thomas Trompette Date: Thu, 29 Aug 2024 16:41:47 +0200 Subject: [PATCH 3/4] Use batch events --- .../jobs/workflow-statuses-update.job.ts | 90 ++++++++++++------- .../workflow-version-status.listener.ts | 74 ++++++++++----- 2 files changed, 110 insertions(+), 54 deletions(-) diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job.ts b/packages/twenty-server/src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job.ts index 4a65263ae8f9..f9834563fa73 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job.ts @@ -1,6 +1,6 @@ import { Scope } from '@nestjs/common'; -import { isEqual } from 'lodash'; +import isEqual from 'lodash.isequal'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; @@ -20,46 +20,70 @@ export enum WorkflowVersionEventType { DELETE = 'DELETE', } -export type WorkflowVersionEvent = { workspaceId: string } & ( - | WorkflowVersionCreated - | WorkflowVersionStatusUpdate - | WorkflowVersionDeleted +export type WorkflowVersionBatchEvent = { + workspaceId: string; +} & ( + | WorkflowVersionBatchCreateEvent + | WorkflowVersionBatchStatusUpdate + | WorkflowVersionBatchDelete ); -export type WorkflowVersionCreated = { +export type WorkflowVersionBatchCreateEvent = { type: WorkflowVersionEventType.CREATE; - workflowId: string; +} & { + workflowIds: string[]; }; -export type WorkflowVersionStatusUpdate = { - type: WorkflowVersionEventType.STATUS_UPDATE; +type WorkflowVersionStatusUpdate = { workflowId: string; previousStatus: WorkflowVersionStatus; newStatus: WorkflowVersionStatus; }; -export type WorkflowVersionDeleted = { - type: WorkflowVersionEventType.DELETE; - workflowId: string; +export type WorkflowVersionBatchStatusUpdate = { + type: WorkflowVersionEventType.STATUS_UPDATE; +} & { + statusUpdates: WorkflowVersionStatusUpdate[]; }; +export type WorkflowVersionBatchDelete = { + type: WorkflowVersionEventType.DELETE; +} & { workflowIds: string[] }; + @Processor({ queueName: MessageQueue.workflowQueue, scope: Scope.REQUEST }) export class WorkflowStatusesUpdateJob { constructor(private readonly twentyORMManager: TwentyORMManager) {} - async handle(event: WorkflowVersionEvent): Promise { + async handle(event: WorkflowVersionBatchEvent): Promise { switch (event.type) { case WorkflowVersionEventType.CREATE: - return this.handleWorkflowVersionCreated(event); + await Promise.all( + event.workflowIds.map((workflowId) => + this.handleWorkflowVersionCreated(workflowId), + ), + ); + break; case WorkflowVersionEventType.STATUS_UPDATE: - return this.handleWorkflowVersionStatusUpdated(event); + await Promise.all( + event.statusUpdates.map((statusUpdate) => + this.handleWorkflowVersionStatusUpdated(statusUpdate), + ), + ); + break; case WorkflowVersionEventType.DELETE: - return this.handleWorkflowVersionDeleted(event); + await Promise.all( + event.workflowIds.map((workflowId) => + this.handleWorkflowVersionDeleted(workflowId), + ), + ); + break; + default: + break; } } private async handleWorkflowVersionCreated( - event: WorkflowVersionCreated, + workflowId: string, ): Promise { const workflowRepository = await this.twentyORMManager.getRepository( @@ -68,7 +92,7 @@ export class WorkflowStatusesUpdateJob { const workflow = await workflowRepository.findOneOrFail({ where: { - id: event.workflowId, + id: workflowId, }, }); @@ -89,7 +113,7 @@ export class WorkflowStatusesUpdateJob { } private async handleWorkflowVersionStatusUpdated( - event: WorkflowVersionStatusUpdate, + statusUpdate: WorkflowVersionStatusUpdate, ): Promise { const workflowRepository = await this.twentyORMManager.getRepository( @@ -98,7 +122,7 @@ export class WorkflowStatusesUpdateJob { const workflow = await workflowRepository.findOneOrFail({ where: { - id: event.workflowId, + id: statusUpdate.workflowId, }, }); @@ -106,8 +130,8 @@ export class WorkflowStatusesUpdateJob { let newWorkflowStatuses = new Set(currentWorkflowStatuses); if ( - event.previousStatus === WorkflowVersionStatus.DEACTIVATED && - event.newStatus === WorkflowVersionStatus.ACTIVE + statusUpdate.previousStatus === WorkflowVersionStatus.DEACTIVATED && + statusUpdate.newStatus === WorkflowVersionStatus.ACTIVE ) { newWorkflowStatuses = await this.buildStatusesFromNewActivation( currentWorkflowStatuses, @@ -115,21 +139,21 @@ export class WorkflowStatusesUpdateJob { } if ( - event.previousStatus === WorkflowVersionStatus.ACTIVE && - event.newStatus === WorkflowVersionStatus.DEACTIVATED + statusUpdate.previousStatus === WorkflowVersionStatus.ACTIVE && + statusUpdate.newStatus === WorkflowVersionStatus.DEACTIVATED ) { newWorkflowStatuses = await this.buildStatusesFromDeactivation( - event.workflowId, + statusUpdate.workflowId, currentWorkflowStatuses, ); } if ( - event.previousStatus === WorkflowVersionStatus.DRAFT && - event.newStatus === WorkflowVersionStatus.ACTIVE + statusUpdate.previousStatus === WorkflowVersionStatus.DRAFT && + statusUpdate.newStatus === WorkflowVersionStatus.ACTIVE ) { newWorkflowStatuses = await this.buildStatusesFromFirstActivation( - event.workflowId, + statusUpdate.workflowId, ); } @@ -143,7 +167,7 @@ export class WorkflowStatusesUpdateJob { await workflowRepository.update( { - id: event.workflowId, + id: statusUpdate.workflowId, }, { statuses: newWorkflowStatusesArray, @@ -152,7 +176,7 @@ export class WorkflowStatusesUpdateJob { } private async handleWorkflowVersionDeleted( - event: WorkflowVersionDeleted, + workflowId: string, ): Promise { const workflowRepository = await this.twentyORMManager.getRepository( @@ -161,7 +185,7 @@ export class WorkflowStatusesUpdateJob { const workflow = await workflowRepository.findOneOrFail({ where: { - id: event.workflowId, + id: workflowId, }, }); @@ -172,7 +196,7 @@ export class WorkflowStatusesUpdateJob { } const hasWorkflowVersionByStatus = await this.hasWorkflowVersionByStatus( - event.workflowId, + workflowId, WorkflowVersionStatus.DRAFT, ); @@ -182,7 +206,7 @@ export class WorkflowStatusesUpdateJob { await workflowRepository.update( { - id: event.workflowId, + id: workflowId, }, { statuses: currentWorkflowStatuses.filter( diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/listeners/workflow-version-status.listener.ts b/packages/twenty-server/src/modules/workflow/workflow-status/listeners/workflow-version-status.listener.ts index 4ffd249427a7..6f22f7a3b7d0 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-status/listeners/workflow-version-status.listener.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-status/listeners/workflow-version-status.listener.ts @@ -2,17 +2,19 @@ import { Injectable, Logger } from '@nestjs/common'; import { OnEvent } from '@nestjs/event-emitter'; import { ObjectRecordCreateEvent } from 'src/engine/integrations/event-emitter/types/object-record-create.event'; +import { ObjectRecordDeleteEvent } from 'src/engine/integrations/event-emitter/types/object-record-delete.event'; import { ObjectRecordUpdateEvent } from 'src/engine/integrations/event-emitter/types/object-record-update.event'; import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; import { WorkflowVersionStatus, WorkflowVersionWorkspaceEntity, } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; import { WorkflowStatusesUpdateJob, - WorkflowVersionEvent, + WorkflowVersionBatchEvent, WorkflowVersionEventType, } from 'src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job'; @@ -27,58 +29,88 @@ export class WorkflowVersionStatusListener { @OnEvent('workflowVersion.created') async handleWorkflowVersionCreated( - event: ObjectRecordCreateEvent, + payload: WorkspaceEventBatch< + ObjectRecordCreateEvent + >, ): Promise { - await this.messageQueueService.add( + const workflowIds = payload.events + .filter( + (event) => + event.properties.after.status === WorkflowVersionStatus.DRAFT, + ) + .map((event) => event.properties.after.workflowId); + + if (workflowIds.length === 0) { + return; + } + + await this.messageQueueService.add( WorkflowStatusesUpdateJob.name, { type: WorkflowVersionEventType.CREATE, - workflowId: event.properties.after.workflowId, + workspaceId: payload.workspaceId, + workflowIds, }, ); } @OnEvent('workflowVersion.updated') async handleWorkflowVersionUpdated( - event: ObjectRecordUpdateEvent, + payload: WorkspaceEventBatch< + ObjectRecordUpdateEvent + >, ): Promise { - const workflowVersionBefore = event.properties.before; - const workflowVersionAfter = event.properties.after; + const statusUpdates = payload.events + .map((event) => { + return { + workflowId: event.properties.after.workflowId, + previousStatus: event.properties.before.status, + newStatus: event.properties.after.status, + }; + }) + .filter( + (workflowVersionEvent) => + workflowVersionEvent.previousStatus !== + workflowVersionEvent.newStatus, + ); - if (workflowVersionBefore.status === workflowVersionAfter.status) { + if (statusUpdates.length === 0) { return; } - await this.messageQueueService.add( + await this.messageQueueService.add( WorkflowStatusesUpdateJob.name, { type: WorkflowVersionEventType.STATUS_UPDATE, - workflowId: workflowVersionAfter.workflowId, - previousStatus: workflowVersionBefore.status, - newStatus: workflowVersionAfter.status, + workspaceId: payload.workspaceId, + statusUpdates, }, ); } @OnEvent('workflowVersion.deleted') async handleWorkflowVersionDeleted( - event: ObjectRecordCreateEvent, + payload: WorkspaceEventBatch< + ObjectRecordDeleteEvent + >, ): Promise { - const workflowVersionDeleted = event.properties.after; - - if (workflowVersionDeleted.status !== WorkflowVersionStatus.DRAFT) { - this.logger.warn( - `Workflow version ${event.recordId} with status ${workflowVersionDeleted.status} was deleted.`, - ); + const workflowIds = payload.events + .filter( + (event) => + event.properties.before.status === WorkflowVersionStatus.DRAFT, + ) + .map((event) => event.properties.before.workflowId); + if (workflowIds.length === 0) { return; } - await this.messageQueueService.add( + await this.messageQueueService.add( WorkflowStatusesUpdateJob.name, { type: WorkflowVersionEventType.DELETE, - workflowId: workflowVersionDeleted.workflowId, + workspaceId: payload.workspaceId, + workflowIds, }, ); } From 04cbc4b3bdafab8466c810231aff0507712a6574 Mon Sep 17 00:00:00 2001 From: Thomas Trompette Date: Thu, 29 Aug 2024 17:56:06 +0200 Subject: [PATCH 4/4] Build new events --- .../workflow-statuses-update.job.spec.ts | 82 ++++++++++++------- .../jobs/workflow-statuses-update.job.ts | 4 +- .../workflow-version-status.listener.ts | 30 ++----- .../workflow-status/workflow-status.module.ts | 9 ++ .../workflow-trigger.workspace-service.ts | 73 +++++++++++++---- .../src/modules/workflow/workflow.module.ts | 3 +- 6 files changed, 133 insertions(+), 68 deletions(-) create mode 100644 packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/jobs/__tests__/workflow-statuses-update.job.spec.ts b/packages/twenty-server/src/modules/workflow/workflow-status/jobs/__tests__/workflow-statuses-update.job.spec.ts index 8f5bceff0d29..320ae3b7f63c 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-status/jobs/__tests__/workflow-statuses-update.job.spec.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-status/jobs/__tests__/workflow-statuses-update.job.spec.ts @@ -4,9 +4,9 @@ import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { WorkflowVersionStatus } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; import { WorkflowStatus } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; import { - WorkflowStatusesUpdateJob, - WorkflowVersionEvent, - WorkflowVersionEventType, + WorkflowStatusesUpdateJob, + WorkflowVersionBatchEvent, + WorkflowVersionEventType, } from 'src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job'; describe('WorkflowStatusesUpdate', () => { @@ -60,10 +60,10 @@ describe('WorkflowStatusesUpdate', () => { describe('when event type is CREATE', () => { it('when already a draft, do not change anything', async () => { // Arrange - const event: WorkflowVersionEvent = { + const event: WorkflowVersionBatchEvent = { workspaceId: '1', type: WorkflowVersionEventType.CREATE, - workflowId: '1', + workflowIds: ['1'], }; const mockWorkflow = { @@ -82,10 +82,10 @@ describe('WorkflowStatusesUpdate', () => { it('when no draft yet, update statuses', async () => { // Arrange - const event: WorkflowVersionEvent = { + const event: WorkflowVersionBatchEvent = { workspaceId: '1', type: WorkflowVersionEventType.CREATE, - workflowId: '1', + workflowIds: ['1'], }; const mockWorkflow = { @@ -106,12 +106,16 @@ describe('WorkflowStatusesUpdate', () => { describe('when event type is STATUS_UPDATE', () => { test('when status is the same, should not do anything', async () => { // Arrange - const event: WorkflowVersionEvent = { + const event: WorkflowVersionBatchEvent = { workspaceId: '1', type: WorkflowVersionEventType.STATUS_UPDATE, - workflowId: '1', - previousStatus: WorkflowVersionStatus.ACTIVE, - newStatus: WorkflowVersionStatus.ACTIVE, + statusUpdates: [ + { + workflowId: '1', + previousStatus: WorkflowVersionStatus.ACTIVE, + newStatus: WorkflowVersionStatus.ACTIVE, + }, + ], }; const mockWorkflow = { @@ -130,12 +134,16 @@ describe('WorkflowStatusesUpdate', () => { test('when update that should be impossible, do not do anything', async () => { // Arrange - const event: WorkflowVersionEvent = { + const event: WorkflowVersionBatchEvent = { workspaceId: '1', type: WorkflowVersionEventType.STATUS_UPDATE, - workflowId: '1', - previousStatus: WorkflowVersionStatus.ACTIVE, - newStatus: WorkflowVersionStatus.DRAFT, + statusUpdates: [ + { + workflowId: '1', + previousStatus: WorkflowVersionStatus.ACTIVE, + newStatus: WorkflowVersionStatus.DRAFT, + }, + ], }; const mockWorkflow = { @@ -154,12 +162,16 @@ describe('WorkflowStatusesUpdate', () => { test('when WorkflowVersionStatus.DEACTIVATED to WorkflowVersionStatus.ACTIVE, should activate', async () => { // Arrange - const event: WorkflowVersionEvent = { + const event: WorkflowVersionBatchEvent = { workspaceId: '1', type: WorkflowVersionEventType.STATUS_UPDATE, - workflowId: '1', - previousStatus: WorkflowVersionStatus.DEACTIVATED, - newStatus: WorkflowVersionStatus.ACTIVE, + statusUpdates: [ + { + workflowId: '1', + previousStatus: WorkflowVersionStatus.DEACTIVATED, + newStatus: WorkflowVersionStatus.ACTIVE, + }, + ], }; const mockWorkflow = { @@ -181,12 +193,16 @@ describe('WorkflowStatusesUpdate', () => { test('when WorkflowVersionStatus.ACTIVE to WorkflowVersionStatus.DEACTIVATED, should deactivate', async () => { // Arrange - const event: WorkflowVersionEvent = { + const event: WorkflowVersionBatchEvent = { workspaceId: '1', type: WorkflowVersionEventType.STATUS_UPDATE, - workflowId: '1', - previousStatus: WorkflowVersionStatus.ACTIVE, - newStatus: WorkflowVersionStatus.DEACTIVATED, + statusUpdates: [ + { + workflowId: '1', + previousStatus: WorkflowVersionStatus.ACTIVE, + newStatus: WorkflowVersionStatus.DEACTIVATED, + }, + ], }; const mockWorkflow = { @@ -215,12 +231,16 @@ describe('WorkflowStatusesUpdate', () => { test('when WorkflowVersionStatus.DRAFT to WorkflowVersionStatus.ACTIVE, should activate', async () => { // Arrange - const event: WorkflowVersionEvent = { + const event: WorkflowVersionBatchEvent = { workspaceId: '1', type: WorkflowVersionEventType.STATUS_UPDATE, - workflowId: '1', - previousStatus: WorkflowVersionStatus.DRAFT, - newStatus: WorkflowVersionStatus.ACTIVE, + statusUpdates: [ + { + workflowId: '1', + previousStatus: WorkflowVersionStatus.DRAFT, + newStatus: WorkflowVersionStatus.ACTIVE, + }, + ], }; const mockWorkflow = { @@ -251,10 +271,10 @@ describe('WorkflowStatusesUpdate', () => { describe('when event type is DELETE', () => { test('when status is not draft, should not do anything', async () => { // Arrange - const event: WorkflowVersionEvent = { + const event: WorkflowVersionBatchEvent = { workspaceId: '1', type: WorkflowVersionEventType.DELETE, - workflowId: '1', + workflowIds: ['1'], }; const mockWorkflow = { @@ -273,10 +293,10 @@ describe('WorkflowStatusesUpdate', () => { test('when status is draft, should delete', async () => { // Arrange - const event: WorkflowVersionEvent = { + const event: WorkflowVersionBatchEvent = { workspaceId: '1', type: WorkflowVersionEventType.DELETE, - workflowId: '1', + workflowIds: ['1'], }; const mockWorkflow = { diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job.ts b/packages/twenty-server/src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job.ts index f9834563fa73..62c05ffc5574 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job.ts @@ -2,6 +2,7 @@ import { Scope } from '@nestjs/common'; import isEqual from 'lodash.isequal'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; @@ -34,7 +35,7 @@ export type WorkflowVersionBatchCreateEvent = { workflowIds: string[]; }; -type WorkflowVersionStatusUpdate = { +export type WorkflowVersionStatusUpdate = { workflowId: string; previousStatus: WorkflowVersionStatus; newStatus: WorkflowVersionStatus; @@ -54,6 +55,7 @@ export type WorkflowVersionBatchDelete = { export class WorkflowStatusesUpdateJob { constructor(private readonly twentyORMManager: TwentyORMManager) {} + @Process(WorkflowStatusesUpdateJob.name) async handle(event: WorkflowVersionBatchEvent): Promise { switch (event.type) { case WorkflowVersionEventType.CREATE: diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/listeners/workflow-version-status.listener.ts b/packages/twenty-server/src/modules/workflow/workflow-status/listeners/workflow-version-status.listener.ts index 6f22f7a3b7d0..624336f3b2f1 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-status/listeners/workflow-version-status.listener.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-status/listeners/workflow-version-status.listener.ts @@ -1,9 +1,8 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { OnEvent } from '@nestjs/event-emitter'; import { ObjectRecordCreateEvent } from 'src/engine/integrations/event-emitter/types/object-record-create.event'; import { ObjectRecordDeleteEvent } from 'src/engine/integrations/event-emitter/types/object-record-delete.event'; -import { ObjectRecordUpdateEvent } from 'src/engine/integrations/event-emitter/types/object-record-update.event'; import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; @@ -16,12 +15,11 @@ import { WorkflowStatusesUpdateJob, WorkflowVersionBatchEvent, WorkflowVersionEventType, + WorkflowVersionStatusUpdate, } from 'src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job'; @Injectable() export class WorkflowVersionStatusListener { - private readonly logger = new Logger(WorkflowVersionStatusListener.name); - constructor( @InjectMessageQueue(MessageQueue.workflowQueue) private readonly messageQueueService: MessageQueueService, @@ -36,6 +34,7 @@ export class WorkflowVersionStatusListener { const workflowIds = payload.events .filter( (event) => + !event.properties.after.status || event.properties.after.status === WorkflowVersionStatus.DRAFT, ) .map((event) => event.properties.after.workflowId); @@ -54,25 +53,14 @@ export class WorkflowVersionStatusListener { ); } - @OnEvent('workflowVersion.updated') + @OnEvent('workflowVersion.statusUpdated') async handleWorkflowVersionUpdated( - payload: WorkspaceEventBatch< - ObjectRecordUpdateEvent - >, + payload: WorkspaceEventBatch, ): Promise { - const statusUpdates = payload.events - .map((event) => { - return { - workflowId: event.properties.after.workflowId, - previousStatus: event.properties.before.status, - newStatus: event.properties.after.status, - }; - }) - .filter( - (workflowVersionEvent) => - workflowVersionEvent.previousStatus !== - workflowVersionEvent.newStatus, - ); + const statusUpdates = payload.events.filter( + (workflowVersionEvent) => + workflowVersionEvent.previousStatus !== workflowVersionEvent.newStatus, + ); if (statusUpdates.length === 0) { return; diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts new file mode 100644 index 000000000000..57c69f530e84 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts @@ -0,0 +1,9 @@ +import { Module } from '@nestjs/common'; + +import { WorkflowStatusesUpdateJob } from 'src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job'; +import { WorkflowVersionStatusListener } from 'src/modules/workflow/workflow-status/listeners/workflow-version-status.listener'; + +@Module({ + providers: [WorkflowStatusesUpdateJob, WorkflowVersionStatusListener], +}) +export class WorkflowStatusModule {} diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.workspace-service.ts index d9a75ca8b859..5a5663edec54 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.workspace-service.ts @@ -7,6 +7,7 @@ import { User } from 'src/engine/core-modules/user/user.entity'; import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { WorkspaceRepository } from 'src/engine/twenty-orm/repository/workspace.repository'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; import { WorkflowVersionStatus, WorkflowVersionWorkspaceEntity, @@ -18,6 +19,7 @@ import { } from 'src/modules/workflow/common/types/workflow-trigger.type'; import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workflow-common.workspace-service'; import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-runner.workspace-service'; +import { WorkflowVersionStatusUpdate } from 'src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job'; import { DatabaseEventTriggerService } from 'src/modules/workflow/workflow-trigger/database-event-trigger/database-event-trigger.service'; import { assertVersionCanBeActivated } from 'src/modules/workflow/workflow-trigger/utils/assert-version-can-be-activated.util'; import { @@ -33,6 +35,7 @@ export class WorkflowTriggerWorkspaceService { private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory, private readonly workflowRunnerWorkspaceService: WorkflowRunnerWorkspaceService, private readonly databaseEventTriggerService: DatabaseEventTriggerService, + private readonly workspaceEventEmitter: WorkspaceEventEmitter, ) {} async runWorkflowVersion( @@ -171,19 +174,18 @@ export class WorkflowTriggerWorkspaceService { workflowVersionRepository, manager, ); - - await this.upgradeWorflowVersion( - workflow, - workflowVersion.id, - workflowRepository, - workflowVersionRepository, - manager, - ); } - await this.setActiveVersionStatus( - workflowVersion.workflowId, + await this.upgradeWorflowVersion( + workflow, workflowVersion.id, + workflowRepository, + workflowVersionRepository, + manager, + ); + + await this.setActiveVersionStatus( + workflowVersion, workflowVersionRepository, manager, ); @@ -215,14 +217,18 @@ export class WorkflowTriggerWorkspaceService { } private async setActiveVersionStatus( - workflowId: string, - workflowVersionId: string, + workflowVersion: Omit & { + trigger: WorkflowTrigger; + }, workflowVersionRepository: WorkspaceRepository, manager: EntityManager, ) { const activeWorkflowVersions = await workflowVersionRepository.find( { - where: { workflowId, status: WorkflowVersionStatus.ACTIVE }, + where: { + workflowId: workflowVersion.workflowId, + status: WorkflowVersionStatus.ACTIVE, + }, }, manager, ); @@ -235,10 +241,16 @@ export class WorkflowTriggerWorkspaceService { } await workflowVersionRepository.update( - { id: workflowVersionId }, + { id: workflowVersion.id }, { status: WorkflowVersionStatus.ACTIVE }, manager, ); + + this.emitStatusUpdateEventOrThrow( + workflowVersion.workflowId, + workflowVersion.status, + WorkflowVersionStatus.ACTIVE, + ); } private async setDeactivatedVersionStatus( @@ -260,6 +272,12 @@ export class WorkflowTriggerWorkspaceService { { status: WorkflowVersionStatus.DEACTIVATED }, manager, ); + + this.emitStatusUpdateEventOrThrow( + workflowVersion.workflowId, + workflowVersion.status, + WorkflowVersionStatus.DEACTIVATED, + ); } private async upgradeWorflowVersion( @@ -324,4 +342,31 @@ export class WorkflowTriggerWorkspaceService { break; } } + + private emitStatusUpdateEventOrThrow( + workflowId: string, + previousStatus: WorkflowVersionStatus, + newStatus: WorkflowVersionStatus, + ) { + const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId; + + if (!workspaceId) { + throw new WorkflowTriggerException( + 'No workspace id found', + WorkflowTriggerExceptionCode.INTERNAL_ERROR, + ); + } + + this.workspaceEventEmitter.emit( + 'workflowVersion.statusUpdated', + [ + { + workflowId, + previousStatus, + newStatus, + } satisfies WorkflowVersionStatusUpdate, + ], + workspaceId, + ); + } } diff --git a/packages/twenty-server/src/modules/workflow/workflow.module.ts b/packages/twenty-server/src/modules/workflow/workflow.module.ts index 5f794b972a4d..08456972b3b5 100644 --- a/packages/twenty-server/src/modules/workflow/workflow.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow.module.ts @@ -1,8 +1,9 @@ import { Module } from '@nestjs/common'; +import { WorkflowStatusModule } from 'src/modules/workflow/workflow-status/workflow-status.module'; import { WorkflowTriggerModule } from 'src/modules/workflow/workflow-trigger/workflow-trigger.module'; @Module({ - imports: [WorkflowTriggerModule], + imports: [WorkflowTriggerModule, WorkflowStatusModule], }) export class WorkflowModule {}