Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trigger workflow on database event #6480

Merged
merged 2 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { TimelineMessagingModule } from 'src/engine/core-modules/messaging/timel
import { OpenApiModule } from 'src/engine/core-modules/open-api/open-api.module';
import { PostgresCredentialsModule } from 'src/engine/core-modules/postgres-credentials/postgres-credentials.module';
import { UserModule } from 'src/engine/core-modules/user/user.module';
import { WorkflowTriggerModule } from 'src/engine/core-modules/workflow/workflow-trigger.module';
import { WorkflowTriggerCoreModule } from 'src/engine/core-modules/workflow/core-workflow-trigger.module';
thomtrp marked this conversation as resolved.
Show resolved Hide resolved
import { WorkspaceModule } from 'src/engine/core-modules/workspace/workspace.module';

import { AnalyticsModule } from './analytics/analytics.module';
Expand All @@ -35,7 +35,7 @@ import { FileModule } from './file/file.module';
WorkspaceModule,
AISQLQueryModule,
PostgresCredentialsModule,
WorkflowTriggerModule,
WorkflowTriggerCoreModule,
thomtrp marked this conversation as resolved.
Show resolved Hide resolved
],
exports: [
AnalyticsModule,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ import { Module } from '@nestjs/common';

import { WorkflowTriggerResolver } from 'src/engine/core-modules/workflow/workflow-trigger.resolver';
import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module';
import { WorkflowTriggerService } from 'src/modules/workflow/workflow-trigger/workflow-trigger.service';
import { WorkflowRunnerModule } from 'src/modules/workflow/workflow-runner/workflow-runner.module';
import { WorkflowTriggerService } from 'src/modules/workflow/workflow-trigger/workflow-trigger.service';

@Module({
imports: [WorkflowCommonModule, WorkflowRunnerModule],
providers: [WorkflowTriggerService, WorkflowTriggerResolver],
})
export class WorkflowTriggerModule {}
export class WorkflowTriggerCoreModule {}
thomtrp marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { Field, InputType } from '@nestjs/graphql';

import graphqlTypeJson from 'graphql-type-json';

@InputType()
export class RunWorkflowVersionInput {
@Field(() => String, {
description: 'Workflow version ID',
nullable: false,
})
workflowVersionId: string;

@Field(() => graphqlTypeJson, {
description: 'Execution result in JSON format',
nullable: true,
})
payload?: JSON;
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { UseGuards } from '@nestjs/common';
import { Args, Mutation, Resolver } from '@nestjs/graphql';

import { RunWorkflowVersionInput } from 'src/engine/core-modules/workflow/dtos/run-workflow-version-input.dto';
import { WorkflowTriggerResultDTO } from 'src/engine/core-modules/workflow/dtos/workflow-trigger-result.dto';
import { workflowTriggerGraphqlApiExceptionHandler } from 'src/engine/core-modules/workflow/utils/workflow-trigger-graphql-api-exception-handler.util';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { AuthWorkspace } from 'src/engine/decorators/auth/auth-workspace.decorator';
import { JwtAuthGuard } from 'src/engine/guards/jwt.auth.guard';
import { WorkflowTriggerService } from 'src/modules/workflow/workflow-trigger/workflow-trigger.service';
import { WorkflowTriggerResultDTO } from 'src/engine/core-modules/workflow/dtos/workflow-trigger-result.dto';

@UseGuards(JwtAuthGuard)
@Resolver()
Expand All @@ -31,15 +32,16 @@ export class WorkflowTriggerResolver {
}

@Mutation(() => WorkflowTriggerResultDTO)
async triggerWorkflow(
async runWorkflowVersion(
@AuthWorkspace() { id: workspaceId }: Workspace,
@Args('workflowVersionId') workflowVersionId: string,
@Args('input') { workflowVersionId, payload }: RunWorkflowVersionInput,
) {
try {
return {
result: await this.workflowTriggerService.runWorkflow(
result: await this.workflowTriggerService.runWorkflowVersion(
workspaceId,
workflowVersionId,
payload ?? {},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider handling cases where payload might be undefined or null more explicitly.

),
};
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { AutoCompaniesAndContactsCreationJobModule } from 'src/modules/contact-c
import { MessagingModule } from 'src/modules/messaging/messaging.module';
import { TimelineJobModule } from 'src/modules/timeline/jobs/timeline-job.module';
import { TimelineActivityModule } from 'src/modules/timeline/timeline-activity.module';
import { WorkflowModule } from 'src/modules/workflow/workflow.module';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Ensure WorkflowModule is correctly configured to avoid circular dependencies.


@Module({
imports: [
Expand All @@ -43,6 +44,7 @@ import { TimelineActivityModule } from 'src/modules/timeline/timeline-activity.m
WorkspaceQueryRunnerJobModule,
AutoCompaniesAndContactsCreationJobModule,
TimelineJobModule,
WorkflowModule,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Verify that adding WorkflowModule does not introduce performance bottlenecks.

],
providers: [
CleanInactiveWorkspaceJob,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { WorkflowAction } from 'src/modules/workflow/common/types/workflow-actio

export enum WorkflowTriggerType {
DATABASE_EVENT = 'DATABASE_EVENT',
MANUAL = 'MANUAL',
}
Comment on lines 4 to 5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Removing the MANUAL trigger type might limit future flexibility. Consider if this could be needed again.


type BaseTrigger = {
Expand All @@ -19,10 +18,4 @@ export type WorkflowDatabaseEventTrigger = BaseTrigger & {
};
};

type WorkflowManualTrigger = BaseTrigger & {
type: WorkflowTriggerType.MANUAL;
};

export type WorkflowTrigger =
| WorkflowDatabaseEventTrigger
| WorkflowManualTrigger;
export type WorkflowTrigger = WorkflowDatabaseEventTrigger;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Ensure all references to WorkflowManualTrigger are removed or updated across the codebase.

Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ import { Injectable } from '@nestjs/common';

import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
import { WorkflowTrigger } from 'src/modules/workflow/common/types/workflow-trigger.type';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Reordering imports to maintain logical grouping. Ensure this does not affect dependency initialization.

import {
WorkflowTriggerException,
WorkflowTriggerExceptionCode,
} from 'src/modules/workflow/workflow-trigger/workflow-trigger.exception';
import { WorkflowTrigger } from 'src/modules/workflow/common/types/workflow-trigger.type';

@Injectable()
export class WorkflowCommonService {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,32 +1,54 @@
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { WorkflowRunnerService } from 'src/modules/workflow/workflow-runner/workflow-runner.service';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity';
import { WorkflowCommonService } from 'src/modules/workflow/common/workflow-common.services';
import { WorkflowRunnerService } from 'src/modules/workflow/workflow-runner/workflow-runner.service';

type RunWorkflowJobData = { workspaceId: string; workflowVersionId: string };
export type RunWorkflowJobData = {
workspaceId: string;
workflowId: string;
payload: object;
};

@Processor(MessageQueue.workflowQueue)
export class WorkflowRunnerJob {
constructor(
private readonly workflowCommonService: WorkflowCommonService,
private readonly workflowRunnerService: WorkflowRunnerService,
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
) {}

@Process(WorkflowRunnerJob.name)
async handle({
workspaceId,
workflowVersionId,
workflowId,
payload,
Comment on lines +26 to +27
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be consistent between those handle function parameters and the runWorkflowparameters. Lets use the workflowId in runWorkflow then. Also, we should update the workflowCommonService.getWorkflowVersion so that we don't need twentyORMGlobalManager service here
Then the runWorkflow method can also use the updated version of workflowCommonService.getWorkflowVersion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martmull agreed with the input update. But thinking again about the endpoint, I think this is the version we want to run. This endpoint will be used to test versions. I rename it to be consistent. Happy to discuss it if you disagree!

}: RunWorkflowJobData): Promise<void> {
const workflowRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowWorkspaceEntity>(
workspaceId,
'workflow',
);
Comment on lines +30 to +33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Fetching the repository for each job execution might introduce performance overhead. Consider caching the repository if possible.


const workflow = await workflowRepository.findOneByOrFail({
id: workflowId,
});

if (!workflow.publishedVersionId) {
throw new Error('Workflow has no published version');
}
Comment on lines +39 to +41
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider adding more specific error handling or logging to provide better insights into why a workflow might not have a published version.


const workflowVersion = await this.workflowCommonService.getWorkflowVersion(
workspaceId,
workflowVersionId,
workflow.publishedVersionId,
);

await this.workflowRunnerService.run({
action: workflowVersion.trigger.nextAction,
workspaceId,
payload: workflowVersion.trigger.input,
payload,
});
Comment on lines 48 to 52
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check: Ensure that the payload structure is validated before passing it to the workflow runner service to prevent runtime errors.

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { Logger } from '@nestjs/common';

import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import {
RunWorkflowJobData,
WorkflowRunnerJob,
} from 'src/modules/workflow/workflow-runner/workflow-runner.job';

export type WorkflowEventTriggerJobData = {
workspaceId: string;
workflowId: string;
payload: object;
};

@Processor(MessageQueue.workflowQueue)
export class WorkflowEventTriggerJob {
private readonly logger = new Logger(WorkflowEventTriggerJob.name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider adding error handling to log or manage failures when adding a job to the message queue.


constructor(
@InjectMessageQueue(MessageQueue.workflowQueue)
private readonly messageQueueService: MessageQueueService,
) {}

@Process(WorkflowEventTriggerJob.name)
async handle(data: WorkflowEventTriggerJobData): Promise<void> {
this.messageQueueService.add<RunWorkflowJobData>(WorkflowRunnerJob.name, {
workspaceId: data.workspaceId,
workflowId: data.workflowId,
payload: data.payload,
});
Comment on lines +30 to +34
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check: Ensure that the payload data structure is validated before adding it to the message queue to prevent potential runtime errors.

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Module } from '@nestjs/common';

import { WorkflowRunnerModule } from 'src/modules/workflow/workflow-runner/workflow-runner.module';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Ensure WorkflowRunnerModule can handle the additional load from new event triggers.

import { WorkflowEventTriggerJob } from 'src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Verify WorkflowEventTriggerJob implementation covers all edge cases and error handling.


@Module({
imports: [WorkflowRunnerModule],
providers: [WorkflowEventTriggerJob],
})
export class WorkflowTriggerJobModule {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import { Injectable, Logger } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';

import { FeatureFlagKey } from 'src/engine/core-modules/feature-flag/enums/feature-flag-key.enum';
import { IsFeatureEnabledService } from 'src/engine/core-modules/feature-flag/services/is-feature-enabled.service';
import { ObjectRecordCreateEvent } from 'src/engine/integrations/event-emitter/types/object-record-create.event';
import { ObjectRecordDeleteEvent } from 'src/engine/integrations/event-emitter/types/object-record-delete.event';
import { ObjectRecordUpdateEvent } from 'src/engine/integrations/event-emitter/types/object-record-update.event';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity';
import {
WorkflowEventTriggerJob,
WorkflowEventTriggerJobData,
} from 'src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job';

@Injectable()
export class DatabaseEventTriggerListener {
private readonly logger = new Logger('DatabaseEventTriggerListener');

constructor(
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
@InjectMessageQueue(MessageQueue.workflowQueue)
private readonly messageQueueService: MessageQueueService,
private readonly isFeatureFlagEnabledService: IsFeatureEnabledService,
) {}

@OnEvent('*.created')
async handleObjectRecordCreateEvent(payload: ObjectRecordCreateEvent<any>) {
await this.handleEvent(payload);
}

@OnEvent('*.updated')
async handleObjectRecordUpdateEvent(payload: ObjectRecordUpdateEvent<any>) {
await this.handleEvent(payload);
}

@OnEvent('*.deleted')
async handleObjectRecordDeleteEvent(payload: ObjectRecordDeleteEvent<any>) {
await this.handleEvent(payload);
}

private async handleEvent(
payload:
| ObjectRecordCreateEvent<any>
| ObjectRecordUpdateEvent<any>
| ObjectRecordDeleteEvent<any>,
) {
const workspaceId = payload.workspaceId;
const eventName = payload.name;

if (!workspaceId || !eventName) {
this.logger.error(
`Missing workspaceId or eventName in payload ${JSON.stringify(
payload,
)}`,
);

return;
}
Comment on lines +54 to +62
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Ensure proper error handling and logging for missing workspaceId or eventName to avoid silent failures.


const isWorkflowEnabled =
await this.isFeatureFlagEnabledService.isFeatureEnabled(
FeatureFlagKey.IsWorkflowEnabled,
workspaceId,
);
Comment on lines +64 to +68
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Verify that the feature flag check is performant and does not introduce latency.


if (!isWorkflowEnabled) {
return;
}

const workflowEventListenerRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowEventListenerWorkspaceEntity>(
workspaceId,
'workflowEventListener',
);
Comment on lines +74 to +78
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Consider caching the repository instance to avoid repeated database lookups.


const eventListeners = await workflowEventListenerRepository.find({
where: {
eventName,
},
});
Comment on lines +74 to +84
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think you can move that in workflow-common.service.ts in a getEventListeners method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only place where we fetch eventListeners and not sure we will have others. I prefer put in common functions that will be used in several places, across sub-modules. Wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


for (const eventListener of eventListeners) {
this.messageQueueService.add<WorkflowEventTriggerJobData>(
WorkflowEventTriggerJob.name,
{
workspaceId,
workflowId: eventListener.workflowId,
payload,
},
{ retryLimit: 3 },
);
Comment on lines +86 to +95
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Ensure that the message queue service handles retries and failures gracefully to avoid job loss.

}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Module } from '@nestjs/common';

import { FeatureFlagModule } from 'src/engine/core-modules/feature-flag/feature-flag.module';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Ensure the FeatureFlagModule is correctly configured and handles all relevant feature flags.

import { DatabaseEventTriggerListener } from 'src/modules/workflow/workflow-trigger/listeners/database-event-trigger.listener';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Verify that DatabaseEventTriggerListener handles all edge cases and potential errors gracefully.


@Module({
imports: [FeatureFlagModule],
providers: [DatabaseEventTriggerListener],
})
export class WorkflowTriggerListenerModule {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { Module } from '@nestjs/common';

import { WorkflowTriggerJobModule } from 'src/modules/workflow/workflow-trigger/jobs/workflow-trigger-job.module';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Ensure WorkflowTriggerJobModule does not introduce circular dependencies.

import { WorkflowTriggerListenerModule } from 'src/modules/workflow/workflow-trigger/listeners/workflow-trigger-listener.module';
thomtrp marked this conversation as resolved.
Show resolved Hide resolved

@Module({
imports: [WorkflowTriggerJobModule, WorkflowTriggerListenerModule],
})
export class WorkflowTriggerModule {}
Loading
Loading