From 25f4e44aec8b630501d7aecae6f7773c0b235db3 Mon Sep 17 00:00:00 2001 From: Thomas Trompette Date: Tue, 4 Jun 2024 14:10:58 +0200 Subject: [PATCH] Add backfill position job by workspace (#5725) - Removing existing listener that was backfilling created records without position - Switch to a job that backfill all objects within workspace - Adapting `FIND_BY_POSITION` so it can fetch objects without position. Currently we needed to input a number --- .../record-position-query.factory.spec.ts | 10 +- .../record-position-query.factory.ts | 10 +- .../0-20-record-position-backfill.command.ts | 57 +++++++ .../jobs/record-position-backfill.job.ts | 9 +- .../listeners/record-position.listener.ts | 59 -------- .../record-position-backfill-service.spec.ts | 139 ++++++++++++++++++ .../record-position-backfill-module.ts | 3 +- .../record-position-backfill-service.ts | 120 +++++++++++---- .../workspace-query-runner.module.ts | 4 +- 9 files changed, 307 insertions(+), 104 deletions(-) create mode 100644 packages/twenty-server/src/engine/api/graphql/workspace-query-runner/commands/0-20-record-position-backfill.command.ts delete mode 100644 packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/record-position.listener.ts create mode 100644 packages/twenty-server/src/engine/api/graphql/workspace-query-runner/services/__tests__/record-position-backfill-service.spec.ts diff --git a/packages/twenty-server/src/engine/api/graphql/workspace-query-builder/factories/__tests__/record-position-query.factory.spec.ts b/packages/twenty-server/src/engine/api/graphql/workspace-query-builder/factories/__tests__/record-position-query.factory.spec.ts index 6bd4266e3c1b..106447ccfd05 100644 --- a/packages/twenty-server/src/engine/api/graphql/workspace-query-builder/factories/__tests__/record-position-query.factory.spec.ts +++ b/packages/twenty-server/src/engine/api/graphql/workspace-query-builder/factories/__tests__/record-position-query.factory.spec.ts @@ -19,14 +19,14 @@ describe('RecordPositionQueryFactory', () => { it('should return query and params for FIND_BY_POSITION', async () => { const positionValue = 1; const queryType = RecordPositionQueryType.FIND_BY_POSITION; - const [query, params] = await factory.create( + const [query, params] = factory.create( { positionValue, recordPositionQueryType: queryType }, objectMetadataItem, dataSourceSchema, ); expect(query).toEqual( - `SELECT position FROM ${dataSourceSchema}."${objectMetadataItem.nameSingular}" + `SELECT id, position FROM ${dataSourceSchema}."${objectMetadataItem.nameSingular}" WHERE "position" = $1`, ); expect(params).toEqual([positionValue]); @@ -34,7 +34,7 @@ describe('RecordPositionQueryFactory', () => { it('should return query and params for FIND_MIN_POSITION', async () => { const queryType = RecordPositionQueryType.FIND_MIN_POSITION; - const [query, params] = await factory.create( + const [query, params] = factory.create( { recordPositionQueryType: queryType }, objectMetadataItem, dataSourceSchema, @@ -48,7 +48,7 @@ describe('RecordPositionQueryFactory', () => { it('should return query and params for FIND_MAX_POSITION', async () => { const queryType = RecordPositionQueryType.FIND_MAX_POSITION; - const [query, params] = await factory.create( + const [query, params] = factory.create( { recordPositionQueryType: queryType }, objectMetadataItem, dataSourceSchema, @@ -64,7 +64,7 @@ describe('RecordPositionQueryFactory', () => { const positionValue = 1; const recordId = '1'; const queryType = RecordPositionQueryType.UPDATE_POSITION; - const [query, params] = await factory.create( + const [query, params] = factory.create( { positionValue, recordId, recordPositionQueryType: queryType }, objectMetadataItem, dataSourceSchema, diff --git a/packages/twenty-server/src/engine/api/graphql/workspace-query-builder/factories/record-position-query.factory.ts b/packages/twenty-server/src/engine/api/graphql/workspace-query-builder/factories/record-position-query.factory.ts index dbd279395bf8..2221423a90b4 100644 --- a/packages/twenty-server/src/engine/api/graphql/workspace-query-builder/factories/record-position-query.factory.ts +++ b/packages/twenty-server/src/engine/api/graphql/workspace-query-builder/factories/record-position-query.factory.ts @@ -10,7 +10,7 @@ export enum RecordPositionQueryType { } type FindByPositionQueryArgs = { - positionValue: number; + positionValue: number | null; recordPositionQueryType: RecordPositionQueryType.FIND_BY_POSITION; }; @@ -77,10 +77,12 @@ export class RecordPositionQueryFactory { name: string, dataSourceSchema: string, ): [RecordPositionQuery, RecordPositionQueryParams] { + const positionStringParam = positionValue ? '= $1' : 'IS NULL'; + return [ - `SELECT position FROM ${dataSourceSchema}."${name}" - WHERE "position" = $1`, - [positionValue], + `SELECT id, position FROM ${dataSourceSchema}."${name}" + WHERE "position" ${positionStringParam}`, + positionValue ? [positionValue] : [], ]; } diff --git a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/commands/0-20-record-position-backfill.command.ts b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/commands/0-20-record-position-backfill.command.ts new file mode 100644 index 000000000000..b5709a895481 --- /dev/null +++ b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/commands/0-20-record-position-backfill.command.ts @@ -0,0 +1,57 @@ +import { Inject } from '@nestjs/common'; + +import { Command, CommandRunner, Option } from 'nest-commander'; + +import { + RecordPositionBackfillJob, + RecordPositionBackfillJobData, +} from 'src/engine/api/graphql/workspace-query-runner/jobs/record-position-backfill.job'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; + +export type RecordPositionBackfillCommandOptions = { + workspaceId: string; + dryRun?: boolean; +}; + +@Command({ + name: 'migrate-0.20:backfill-record-position', + description: 'Backfill record position', +}) +export class RecordPositionBackfillCommand extends CommandRunner { + constructor( + @Inject(MessageQueue.recordPositionBackfillQueue) + private readonly messageQueueService: MessageQueueService, + ) { + super(); + } + + @Option({ + flags: '-w, --workspace-id [workspace_id]', + description: 'workspace id', + required: true, + }) + parseWorkspaceId(value: string): string { + return value; + } + + @Option({ + flags: '-d, --dry-run [dry run]', + description: 'Dry run: Log backfill actions.', + required: false, + }) + dryRun(value: string): boolean { + return Boolean(value); + } + + async run( + _passedParam: string[], + options: RecordPositionBackfillCommandOptions, + ): Promise { + this.messageQueueService.add( + RecordPositionBackfillJob.name, + { workspaceId: options.workspaceId, dryRun: options.dryRun ?? false }, + { retryLimit: 3 }, + ); + } +} diff --git a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/jobs/record-position-backfill.job.ts b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/jobs/record-position-backfill.job.ts index bbd777e35dc2..e0c589d9bdff 100644 --- a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/jobs/record-position-backfill.job.ts +++ b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/jobs/record-position-backfill.job.ts @@ -6,8 +6,7 @@ import { RecordPositionBackfillService } from 'src/engine/api/graphql/workspace- export type RecordPositionBackfillJobData = { workspaceId: string; - objectMetadata: { nameSingular: string; isCustom: boolean }; - recordId: string; + dryRun: boolean; }; @Injectable() @@ -19,10 +18,6 @@ export class RecordPositionBackfillJob ) {} async handle(data: RecordPositionBackfillJobData): Promise { - this.recordPositionBackfillService.backfill( - data.workspaceId, - data.objectMetadata, - data.recordId, - ); + this.recordPositionBackfillService.backfill(data.workspaceId, data.dryRun); } } diff --git a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/record-position.listener.ts b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/record-position.listener.ts deleted file mode 100644 index 2ea9d8afb2ae..000000000000 --- a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/record-position.listener.ts +++ /dev/null @@ -1,59 +0,0 @@ -import { Inject, Injectable } from '@nestjs/common'; -import { OnEvent } from '@nestjs/event-emitter'; - -import { ObjectMetadataInterface } from 'src/engine/metadata-modules/field-metadata/interfaces/object-metadata.interface'; - -import { ObjectRecordCreateEvent } from 'src/engine/integrations/event-emitter/types/object-record-create.event'; -import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; -import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; -import { - RecordPositionBackfillJob, - RecordPositionBackfillJobData, -} from 'src/engine/api/graphql/workspace-query-runner/jobs/record-position-backfill.job'; - -@Injectable() -export class RecordPositionListener { - constructor( - @Inject(MessageQueue.recordPositionBackfillQueue) - private readonly messageQueueService: MessageQueueService, - ) {} - - @OnEvent('*.created') - async handleAllCreate(payload: ObjectRecordCreateEvent) { - if (!hasPositionField(payload.objectMetadata)) { - return; - } - - if (hasPositionSet(payload.properties.after)) { - return; - } - - this.messageQueueService.add( - RecordPositionBackfillJob.name, - { - workspaceId: payload.workspaceId, - recordId: payload.recordId, - objectMetadata: { - nameSingular: payload.objectMetadata.nameSingular, - isCustom: payload.objectMetadata.isCustom, - }, - }, - ); - } -} - -// TODO: use objectMetadata instead of hardcoded standard objects name -const hasPositionField = ( - createdObjectMetadata: ObjectMetadataInterface, -): boolean => { - return ( - createdObjectMetadata.isCustom || - ['opportunity', 'company', 'people'].includes( - createdObjectMetadata.nameSingular, - ) - ); -}; - -const hasPositionSet = (createdRecord: any): boolean => { - return !!createdRecord?.position; -}; diff --git a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/services/__tests__/record-position-backfill-service.spec.ts b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/services/__tests__/record-position-backfill-service.spec.ts new file mode 100644 index 000000000000..2f4305dabee7 --- /dev/null +++ b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/services/__tests__/record-position-backfill-service.spec.ts @@ -0,0 +1,139 @@ +import { TestingModule, Test } from '@nestjs/testing'; + +import { RecordPositionQueryFactory } from 'src/engine/api/graphql/workspace-query-builder/factories/record-position-query.factory'; +import { RecordPositionFactory } from 'src/engine/api/graphql/workspace-query-runner/factories/record-position.factory'; +import { RecordPositionBackfillService } from 'src/engine/api/graphql/workspace-query-runner/services/record-position-backfill-service'; +import { ObjectMetadataService } from 'src/engine/metadata-modules/object-metadata/object-metadata.service'; +import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; + +describe('RecordPositionBackfillService', () => { + let recordPositionQueryFactory; + let recordPositionFactory; + let objectMetadataService; + let workspaceDataSourceService; + + let service: RecordPositionBackfillService; + + beforeEach(async () => { + recordPositionQueryFactory = { + create: jest.fn().mockReturnValue(['query', []]), + }; + + recordPositionFactory = { + create: jest.fn().mockResolvedValue([ + { + position: 1, + }, + ]), + }; + + objectMetadataService = { + findManyWithinWorkspace: jest.fn().mockReturnValue([]), + }; + + workspaceDataSourceService = { + getSchemaName: jest.fn().mockReturnValue('schemaName'), + executeRawQuery: jest.fn().mockResolvedValue([]), + }; + const module: TestingModule = await Test.createTestingModule({ + providers: [ + RecordPositionBackfillService, + { + provide: RecordPositionQueryFactory, + useValue: recordPositionQueryFactory, + }, + { + provide: RecordPositionFactory, + useValue: recordPositionFactory, + }, + { + provide: WorkspaceDataSourceService, + useValue: workspaceDataSourceService, + }, + { + provide: ObjectMetadataService, + useValue: objectMetadataService, + }, + ], + }).compile(); + + service = module.get( + RecordPositionBackfillService, + ); + }); + + afterEach(() => { + jest.clearAllMocks(); + }); + + it('should be defined', () => { + expect(service).toBeDefined(); + }); + + it('when no object metadata found, should do nothing', async () => { + await service.backfill('workspaceId', false); + expect(workspaceDataSourceService.executeRawQuery).not.toHaveBeenCalled(); + }); + + it('when objectMetadata without position, should do nothing', async () => { + objectMetadataService.findManyWithinWorkspace.mockReturnValue([ + { + id: '1', + nameSingular: 'name', + fields: [], + }, + ]); + await service.backfill('workspaceId', false); + expect(workspaceDataSourceService.executeRawQuery).not.toHaveBeenCalled(); + }); + + it('when objectMetadata but all record with position, should create and run query once', async () => { + objectMetadataService.findManyWithinWorkspace.mockReturnValue([ + { + id: '1', + nameSingular: 'company', + fields: [], + }, + ]); + await service.backfill('workspaceId', false); + expect(workspaceDataSourceService.executeRawQuery).toHaveBeenCalledTimes(1); + }); + + it('when record without position, should create and run query twice', async () => { + objectMetadataService.findManyWithinWorkspace.mockReturnValue([ + { + id: '1', + nameSingular: 'company', + fields: [], + }, + ]); + workspaceDataSourceService.executeRawQuery.mockResolvedValueOnce([ + { + id: '1', + }, + ]); + await service.backfill('workspaceId', false); + expect(workspaceDataSourceService.executeRawQuery).toHaveBeenCalledTimes(2); + expect(recordPositionFactory.create).toHaveBeenCalledTimes(1); + expect(recordPositionQueryFactory.create).toHaveBeenCalledTimes(2); + }); + + it('when dryRun is true, should not update position', async () => { + objectMetadataService.findManyWithinWorkspace.mockReturnValue([ + { + id: '1', + nameSingular: 'company', + fields: [], + }, + ]); + workspaceDataSourceService.executeRawQuery.mockResolvedValueOnce([ + { + id: '1', + }, + ]); + await service.backfill('workspaceId', true); + expect(workspaceDataSourceService.executeRawQuery).toHaveBeenCalledTimes(1); + expect(recordPositionFactory.create).toHaveBeenCalledTimes(1); + expect(recordPositionQueryFactory.create).toHaveBeenCalledTimes(1); + }); +}); diff --git a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/services/record-position-backfill-module.ts b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/services/record-position-backfill-module.ts index 1a058a46ed35..3934449a621e 100644 --- a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/services/record-position-backfill-module.ts +++ b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/services/record-position-backfill-module.ts @@ -4,9 +4,10 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works import { RecordPositionQueryFactory } from 'src/engine/api/graphql/workspace-query-builder/factories/record-position-query.factory'; import { RecordPositionFactory } from 'src/engine/api/graphql/workspace-query-runner/factories/record-position.factory'; import { RecordPositionBackfillService } from 'src/engine/api/graphql/workspace-query-runner/services/record-position-backfill-service'; +import { ObjectMetadataModule } from 'src/engine/metadata-modules/object-metadata/object-metadata.module'; @Module({ - imports: [WorkspaceDataSourceModule], + imports: [WorkspaceDataSourceModule, ObjectMetadataModule], providers: [ RecordPositionFactory, RecordPositionQueryFactory, diff --git a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/services/record-position-backfill-service.ts b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/services/record-position-backfill-service.ts index 1937e60a3816..fa86853166c7 100644 --- a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/services/record-position-backfill-service.ts +++ b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/services/record-position-backfill-service.ts @@ -1,6 +1,6 @@ -import { Injectable } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; -import { ObjectMetadataInterface } from 'src/engine/metadata-modules/field-metadata/interfaces/object-metadata.interface'; +import { isDefined } from 'class-validator'; import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; import { @@ -8,44 +8,112 @@ import { RecordPositionQueryType, } from 'src/engine/api/graphql/workspace-query-builder/factories/record-position-query.factory'; import { RecordPositionFactory } from 'src/engine/api/graphql/workspace-query-runner/factories/record-position.factory'; +import { ObjectMetadataService } from 'src/engine/metadata-modules/object-metadata/object-metadata.service'; +import { hasPositionField } from 'src/engine/metadata-modules/object-metadata/utils/has-position-field.util'; @Injectable() export class RecordPositionBackfillService { + private readonly logger = new Logger(RecordPositionBackfillService.name); constructor( + private readonly objectMetadataService: ObjectMetadataService, private readonly recordPositionFactory: RecordPositionFactory, private readonly recordPositionQueryFactory: RecordPositionQueryFactory, private readonly workspaceDataSourceService: WorkspaceDataSourceService, ) {} - async backfill( - workspaceId: string, - objectMetadata: { nameSingular: string; isCustom: boolean }, - recordId: string, - ) { - const position = await this.recordPositionFactory.create( - 'last', - objectMetadata as ObjectMetadataInterface, - workspaceId, + async backfill(workspaceId: string, dryRun: boolean) { + this.logger.log( + `Starting backfilling record positions for workspace ${workspaceId}`, ); const dataSourceSchema = this.workspaceDataSourceService.getSchemaName(workspaceId); - const [query, params] = await this.recordPositionQueryFactory.create( - { - recordPositionQueryType: RecordPositionQueryType.UPDATE_POSITION, - recordId, - positionValue: position, - }, - objectMetadata as ObjectMetadataInterface, - dataSourceSchema, - ); + const objectMetadataEntities = + await this.objectMetadataService.findManyWithinWorkspace(workspaceId, { + where: { isSystem: false }, + }); - this.workspaceDataSourceService.executeRawQuery( - query, - params, - workspaceId, - undefined, - ); + const objectMetadataWithPosition = + objectMetadataEntities.filter(hasPositionField); + + for (const objectMetadata of objectMetadataWithPosition) { + const [recordsWithoutPositionQuery, recordsWithoutPositionQueryParams] = + this.recordPositionQueryFactory.create( + { + recordPositionQueryType: RecordPositionQueryType.FIND_BY_POSITION, + positionValue: null, + }, + objectMetadata, + dataSourceSchema, + ); + + const recordsWithoutPosition = + await this.workspaceDataSourceService.executeRawQuery( + recordsWithoutPositionQuery, + recordsWithoutPositionQueryParams, + workspaceId, + ); + + if ( + !isDefined(recordsWithoutPosition) || + recordsWithoutPosition?.length === 0 + ) { + this.logger.log( + `No records without position for ${objectMetadata.nameSingular}`, + ); + continue; + } + + const position = await this.recordPositionFactory.create( + 'last', + { + isCustom: objectMetadata.isCustom, + nameSingular: objectMetadata.nameSingular, + }, + workspaceId, + ); + + for ( + let recordIndex = 0; + recordIndex < recordsWithoutPosition.length; + recordIndex++ + ) { + const recordId = recordsWithoutPosition[recordIndex].id; + + if (!recordId) { + this.logger.log( + `Fetched record without id for ${objectMetadata.nameSingular}`, + ); + continue; + } + + const backfilledPosition = position + recordIndex; + + this.logger.log( + `Backfilling position ${backfilledPosition} for ${objectMetadata.nameSingular} ${recordId}`, + ); + + if (dryRun) { + continue; + } + + const [query, params] = this.recordPositionQueryFactory.create( + { + recordPositionQueryType: RecordPositionQueryType.UPDATE_POSITION, + recordId: recordsWithoutPosition[recordIndex].id, + positionValue: position + recordIndex, + }, + objectMetadata, + dataSourceSchema, + ); + + await this.workspaceDataSourceService.executeRawQuery( + query, + params, + workspaceId, + ); + } + } } } diff --git a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/workspace-query-runner.module.ts b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/workspace-query-runner.module.ts index e10d5db06ad2..7d1b990c2133 100644 --- a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/workspace-query-runner.module.ts +++ b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/workspace-query-runner.module.ts @@ -5,7 +5,6 @@ import { WorkspaceQueryBuilderModule } from 'src/engine/api/graphql/workspace-qu import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module'; import { WorkspacePreQueryHookModule } from 'src/engine/api/graphql/workspace-query-runner/workspace-pre-query-hook/workspace-pre-query-hook.module'; import { workspaceQueryRunnerFactories } from 'src/engine/api/graphql/workspace-query-runner/factories'; -import { RecordPositionListener } from 'src/engine/api/graphql/workspace-query-runner/listeners/record-position.listener'; import { AuthModule } from 'src/engine/core-modules/auth/auth.module'; import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity'; import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; @@ -13,6 +12,7 @@ import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/sta import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module'; import { TelemetryListener } from 'src/engine/api/graphql/workspace-query-runner/listeners/telemetry.listener'; import { AnalyticsModule } from 'src/engine/core-modules/analytics/analytics.module'; +import { RecordPositionBackfillCommand } from 'src/engine/api/graphql/workspace-query-runner/commands/0-20-record-position-backfill.command'; import { WorkspaceQueryRunnerService } from './workspace-query-runner.service'; @@ -31,9 +31,9 @@ import { EntityEventsToDbListener } from './listeners/entity-events-to-db.listen providers: [ WorkspaceQueryRunnerService, ...workspaceQueryRunnerFactories, - RecordPositionListener, EntityEventsToDbListener, TelemetryListener, + RecordPositionBackfillCommand, ], exports: [WorkspaceQueryRunnerService], })