Skip to content

Commit b42a067

Browse files
committed
Add backfill position job
1 parent f7cdd14 commit b42a067

File tree

7 files changed

+145
-101
lines changed

7 files changed

+145
-101
lines changed

packages/twenty-server/src/engine/api/graphql/workspace-query-builder/factories/record-position-query.factory.ts

+6-4
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ export enum RecordPositionQueryType {
1010
}
1111

1212
type FindByPositionQueryArgs = {
13-
positionValue: number;
13+
positionValue: number | null;
1414
recordPositionQueryType: RecordPositionQueryType.FIND_BY_POSITION;
1515
};
1616

@@ -77,10 +77,12 @@ export class RecordPositionQueryFactory {
7777
name: string,
7878
dataSourceSchema: string,
7979
): [RecordPositionQuery, RecordPositionQueryParams] {
80+
const positionStringParam = positionValue ? '= $1' : 'IS NULL';
81+
8082
return [
81-
`SELECT position FROM ${dataSourceSchema}."${name}"
82-
WHERE "position" = $1`,
83-
[positionValue],
83+
`SELECT id, position FROM ${dataSourceSchema}."${name}"
84+
WHERE "position" ${positionStringParam}`,
85+
positionValue ? [positionValue] : [],
8486
];
8587
}
8688

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import { Inject } from '@nestjs/common';
2+
3+
import { Command, CommandRunner, Option } from 'nest-commander';
4+
5+
import {
6+
RecordPositionBackfillJob,
7+
RecordPositionBackfillJobData,
8+
} from 'src/engine/api/graphql/workspace-query-runner/jobs/record-position-backfill.job';
9+
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
10+
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
11+
12+
export type RecordPositionBackfillCommandOptions = {
13+
workspaceId: string;
14+
dryRun?: boolean;
15+
};
16+
17+
@Command({
18+
name: 'backfill-record-position',
19+
description: 'Backfill record position',
20+
})
21+
export class RecordPositionBackfillCommand extends CommandRunner {
22+
constructor(
23+
@Inject(MessageQueue.recordPositionBackfillQueue)
24+
private readonly messageQueueService: MessageQueueService,
25+
) {
26+
super();
27+
}
28+
29+
@Option({
30+
flags: '-w, --workspace-id [workspace_id]',
31+
description: 'workspace id',
32+
required: true,
33+
})
34+
parseWorkspaceId(value: string): string {
35+
return value;
36+
}
37+
38+
@Option({
39+
flags: '-d, --dry-run [dry run]',
40+
description: 'Dry run: Log backfill actions.',
41+
required: false,
42+
})
43+
dryRun(value: string): boolean {
44+
return Boolean(value);
45+
}
46+
47+
async run(
48+
_passedParam: string[],
49+
options: RecordPositionBackfillCommandOptions,
50+
): Promise<void> {
51+
this.messageQueueService.add<RecordPositionBackfillJobData>(
52+
RecordPositionBackfillJob.name,
53+
{ workspaceId: options.workspaceId, dryRun: options.dryRun ?? false },
54+
{ retryLimit: 3 },
55+
);
56+
}
57+
}

packages/twenty-server/src/engine/api/graphql/workspace-query-runner/jobs/record-position-backfill.job.ts

+2-7
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@ import { RecordPositionBackfillService } from 'src/engine/api/graphql/workspace-
66

77
export type RecordPositionBackfillJobData = {
88
workspaceId: string;
9-
objectMetadata: { nameSingular: string; isCustom: boolean };
10-
recordId: string;
9+
dryRun: boolean;
1110
};
1211

1312
@Injectable()
@@ -19,10 +18,6 @@ export class RecordPositionBackfillJob
1918
) {}
2019

2120
async handle(data: RecordPositionBackfillJobData): Promise<void> {
22-
this.recordPositionBackfillService.backfill(
23-
data.workspaceId,
24-
data.objectMetadata,
25-
data.recordId,
26-
);
21+
this.recordPositionBackfillService.backfill(data.workspaceId, data.dryRun);
2722
}
2823
}

packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/record-position.listener.ts

-59
This file was deleted.

packages/twenty-server/src/engine/api/graphql/workspace-query-runner/services/record-position-backfill-module.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works
44
import { RecordPositionQueryFactory } from 'src/engine/api/graphql/workspace-query-builder/factories/record-position-query.factory';
55
import { RecordPositionFactory } from 'src/engine/api/graphql/workspace-query-runner/factories/record-position.factory';
66
import { RecordPositionBackfillService } from 'src/engine/api/graphql/workspace-query-runner/services/record-position-backfill-service';
7+
import { ObjectMetadataModule } from 'src/engine/metadata-modules/object-metadata/object-metadata.module';
78

89
@Module({
9-
imports: [WorkspaceDataSourceModule],
10+
imports: [WorkspaceDataSourceModule, ObjectMetadataModule],
1011
providers: [
1112
RecordPositionFactory,
1213
RecordPositionQueryFactory,
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Injectable } from '@nestjs/common';
1+
import { Injectable, Logger } from '@nestjs/common';
22

33
import { ObjectMetadataInterface } from 'src/engine/metadata-modules/field-metadata/interfaces/object-metadata.interface';
44

@@ -8,44 +8,92 @@ import {
88
RecordPositionQueryType,
99
} from 'src/engine/api/graphql/workspace-query-builder/factories/record-position-query.factory';
1010
import { RecordPositionFactory } from 'src/engine/api/graphql/workspace-query-runner/factories/record-position.factory';
11+
import { ObjectMetadataService } from 'src/engine/metadata-modules/object-metadata/object-metadata.service';
1112

1213
@Injectable()
1314
export class RecordPositionBackfillService {
15+
private readonly logger = new Logger(RecordPositionBackfillService.name);
1416
constructor(
17+
private readonly objectMetadataService: ObjectMetadataService,
1518
private readonly recordPositionFactory: RecordPositionFactory,
1619
private readonly recordPositionQueryFactory: RecordPositionQueryFactory,
1720
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
1821
) {}
1922

20-
async backfill(
21-
workspaceId: string,
22-
objectMetadata: { nameSingular: string; isCustom: boolean },
23-
recordId: string,
24-
) {
25-
const position = await this.recordPositionFactory.create(
26-
'last',
27-
objectMetadata as ObjectMetadataInterface,
28-
workspaceId,
29-
);
30-
23+
async backfill(workspaceId: string, dryRun: boolean) {
3124
const dataSourceSchema =
3225
this.workspaceDataSourceService.getSchemaName(workspaceId);
3326

34-
const [query, params] = await this.recordPositionQueryFactory.create(
35-
{
36-
recordPositionQueryType: RecordPositionQueryType.UPDATE_POSITION,
37-
recordId,
38-
positionValue: position,
39-
},
40-
objectMetadata as ObjectMetadataInterface,
41-
dataSourceSchema,
42-
);
43-
44-
this.workspaceDataSourceService.executeRawQuery(
45-
query,
46-
params,
47-
workspaceId,
48-
undefined,
49-
);
27+
const objectMetadataEntities =
28+
await this.objectMetadataService.findManyWithinWorkspace(workspaceId, {
29+
where: { isSystem: false },
30+
});
31+
32+
const objectMetadataWithPosition =
33+
objectMetadataEntities.filter(hasPositionField);
34+
35+
for (const objectMetadata of objectMetadataWithPosition) {
36+
const [recordsWithoutPositionQuery, recordsWithoutPositionQueryParams] =
37+
await this.recordPositionQueryFactory.create(
38+
{
39+
recordPositionQueryType: RecordPositionQueryType.FIND_BY_POSITION,
40+
positionValue: null,
41+
},
42+
objectMetadata,
43+
dataSourceSchema,
44+
);
45+
46+
const recordsWithoutPosition =
47+
await this.workspaceDataSourceService.executeRawQuery(
48+
recordsWithoutPositionQuery,
49+
recordsWithoutPositionQueryParams,
50+
workspaceId,
51+
undefined,
52+
);
53+
54+
const position = await this.recordPositionFactory.create(
55+
'last',
56+
objectMetadata as ObjectMetadataInterface,
57+
workspaceId,
58+
);
59+
60+
for (
61+
let recordIndex = 0;
62+
recordIndex < recordsWithoutPosition.length;
63+
recordIndex++
64+
) {
65+
const recordId = recordsWithoutPosition[recordIndex].id;
66+
const backfilledPosition = position + recordIndex;
67+
68+
this.logger.log(
69+
`Backfilling position ${backfilledPosition} for ${objectMetadata.nameSingular} ${recordId}`,
70+
);
71+
72+
if (dryRun) {
73+
continue;
74+
}
75+
76+
const [query, params] = await this.recordPositionQueryFactory.create(
77+
{
78+
recordPositionQueryType: RecordPositionQueryType.UPDATE_POSITION,
79+
recordId: recordsWithoutPosition[recordIndex].id,
80+
positionValue: position + recordIndex,
81+
},
82+
objectMetadata,
83+
dataSourceSchema,
84+
);
85+
86+
await this.workspaceDataSourceService.executeRawQuery(
87+
query,
88+
params,
89+
workspaceId,
90+
undefined,
91+
);
92+
}
93+
}
5094
}
5195
}
96+
97+
const hasPositionField = (objectMetadata: ObjectMetadataInterface) =>
98+
['company', 'opportunity', 'person'].includes(objectMetadata.nameSingular) ||
99+
objectMetadata.isCustom;

packages/twenty-server/src/engine/api/graphql/workspace-query-runner/workspace-query-runner.module.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@ import { WorkspaceQueryBuilderModule } from 'src/engine/api/graphql/workspace-qu
55
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
66
import { WorkspacePreQueryHookModule } from 'src/engine/api/graphql/workspace-query-runner/workspace-pre-query-hook/workspace-pre-query-hook.module';
77
import { workspaceQueryRunnerFactories } from 'src/engine/api/graphql/workspace-query-runner/factories';
8-
import { RecordPositionListener } from 'src/engine/api/graphql/workspace-query-runner/listeners/record-position.listener';
98
import { AuthModule } from 'src/engine/core-modules/auth/auth.module';
109
import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity';
1110
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
1211
import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity';
1312
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
1413
import { TelemetryListener } from 'src/engine/api/graphql/workspace-query-runner/listeners/telemetry.listener';
1514
import { AnalyticsModule } from 'src/engine/core-modules/analytics/analytics.module';
15+
import { RecordPositionBackfillCommand } from 'src/engine/api/graphql/workspace-query-runner/commands/record-position-backfill.command';
1616

1717
import { WorkspaceQueryRunnerService } from './workspace-query-runner.service';
1818

@@ -31,9 +31,9 @@ import { EntityEventsToDbListener } from './listeners/entity-events-to-db.listen
3131
providers: [
3232
WorkspaceQueryRunnerService,
3333
...workspaceQueryRunnerFactories,
34-
RecordPositionListener,
3534
EntityEventsToDbListener,
3635
TelemetryListener,
36+
RecordPositionBackfillCommand,
3737
],
3838
exports: [WorkspaceQueryRunnerService],
3939
})

0 commit comments

Comments
 (0)