Skip to content

Commit 87465b1

Browse files
authored
5507 modify the partial sync cron to work with the new statuses (#5512)
Closes #5507
1 parent 3de5ed3 commit 87465b1

File tree

31 files changed

+1185
-115
lines changed

31 files changed

+1185
-115
lines changed

packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts

+5-5
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ import {
3232
MessageChannelVisibility,
3333
} from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
3434
import {
35-
GmailFullSyncJobData,
36-
GmailFullSyncJob,
37-
} from 'src/modules/messaging/jobs/gmail-full-sync.job';
35+
GmailFullMessageListFetchJobData,
36+
GmailFullMessageListFetchJob,
37+
} from 'src/modules/messaging/jobs/gmail-full-message-list-fetch.job';
3838

3939
@Injectable()
4040
export class GoogleAPIsService {
@@ -156,8 +156,8 @@ export class GoogleAPIsService {
156156
isCalendarEnabled: boolean,
157157
) {
158158
if (this.environmentService.get('MESSAGING_PROVIDER_GMAIL_ENABLED')) {
159-
await this.messageQueueService.add<GmailFullSyncJobData>(
160-
GmailFullSyncJob.name,
159+
await this.messageQueueService.add<GmailFullMessageListFetchJobData>(
160+
GmailFullMessageListFetchJob.name,
161161
{
162162
workspaceId,
163163
connectedAccountId,

packages/twenty-server/src/engine/integrations/message-queue/jobs.module.ts

+6-6
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ import { DataSourceModule } from 'src/engine/metadata-modules/data-source/data-s
1717
import { ObjectMetadataModule } from 'src/engine/metadata-modules/object-metadata/object-metadata.module';
1818
import { CleanInactiveWorkspaceJob } from 'src/engine/workspace-manager/workspace-cleaner/crons/clean-inactive-workspace.job';
1919
import { CalendarEventParticipantModule } from 'src/modules/calendar/services/calendar-event-participant/calendar-event-participant.module';
20-
import { GmailFetchMessageContentFromCacheModule } from 'src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.module';
21-
import { GmailFullSyncModule } from 'src/modules/messaging/services/gmail-full-sync/gmail-full-sync.module';
22-
import { GmailPartialSyncModule } from 'src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.module';
20+
import { GmailMessagesImportModule } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import.module';
21+
import { GmailFullMessageListFetchModule } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.module';
22+
import { GmailPartialMessageListFetchModule } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.module';
2323
import { TimelineActivityModule } from 'src/modules/timeline/timeline-activity.module';
2424
import { WorkspaceModule } from 'src/engine/core-modules/workspace/workspace.module';
2525
import { CalendarMessagingParticipantJobModule } from 'src/modules/calendar-messaging-participant/jobs/calendar-messaging-participant-job.module';
@@ -41,9 +41,9 @@ import { TimelineJobModule } from 'src/modules/timeline/jobs/timeline-job.module
4141
BillingModule,
4242
UserWorkspaceModule,
4343
WorkspaceModule,
44-
GmailFullSyncModule,
45-
GmailFetchMessageContentFromCacheModule,
46-
GmailPartialSyncModule,
44+
GmailFullMessageListFetchModule,
45+
GmailMessagesImportModule,
46+
GmailPartialMessageListFetchModule,
4747
CalendarEventParticipantModule,
4848
TimelineActivityModule,
4949
StripeModule,

packages/twenty-server/src/modules/connected-account/repositories/connected-account.repository.ts

+26
Original file line numberDiff line numberDiff line change
@@ -254,4 +254,30 @@ export class ConnectedAccountRepository {
254254
transactionManager,
255255
);
256256
}
257+
258+
public async getConnectedAccountOrThrow(
259+
workspaceId: string,
260+
connectedAccountId: string,
261+
): Promise<ObjectRecord<ConnectedAccountWorkspaceEntity>> {
262+
const connectedAccount = await this.getById(
263+
connectedAccountId,
264+
workspaceId,
265+
);
266+
267+
if (!connectedAccount) {
268+
throw new Error(
269+
`Connected account ${connectedAccountId} not found in workspace ${workspaceId}`,
270+
);
271+
}
272+
273+
const refreshToken = connectedAccount.refreshToken;
274+
275+
if (!refreshToken) {
276+
throw new Error(
277+
`No refresh token found for connected account ${connectedAccountId} in workspace ${workspaceId}`,
278+
);
279+
}
280+
281+
return connectedAccount;
282+
}
257283
}

packages/twenty-server/src/modules/messaging/crons/commands/gmail-partial-sync.cron.command.ts renamed to packages/twenty-server/src/modules/messaging/crons/commands/gmail-message-list-fetch.cron.command.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,16 @@ import { Command, CommandRunner } from 'nest-commander';
44

55
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
66
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
7-
import { GmailPartialSyncCronJob } from 'src/modules/messaging/crons/jobs/gmail-partial-sync.cron.job';
7+
import { GmailMessageListFetchCronJob } from 'src/modules/messaging/crons/jobs/gmail-message-list-fetch.cron.job';
88

99
const GMAIL_PARTIAL_SYNC_CRON_PATTERN = '*/5 * * * *';
1010

1111
@Command({
12-
name: 'cron:messaging:gmail-partial-sync',
12+
name: 'cron:messaging:gmail-message-list-fetch',
1313
description:
1414
'Starts a cron job to sync existing connected account messages and store them in the cache',
1515
})
16-
export class GmailPartialSyncCronCommand extends CommandRunner {
16+
export class GmailMessageListFetchCronCommand extends CommandRunner {
1717
constructor(
1818
@Inject(MessageQueue.cronQueue)
1919
private readonly messageQueueService: MessageQueueService,
@@ -23,7 +23,7 @@ export class GmailPartialSyncCronCommand extends CommandRunner {
2323

2424
async run(): Promise<void> {
2525
await this.messageQueueService.addCron<undefined>(
26-
GmailPartialSyncCronJob.name,
26+
GmailMessageListFetchCronJob.name,
2727
undefined,
2828
{
2929
repeat: { pattern: GMAIL_PARTIAL_SYNC_CRON_PATTERN },
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ import { Command, CommandRunner } from 'nest-commander';
44

55
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
66
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
7-
import { GmailFetchMessagesFromCacheCronJob } from 'src/modules/messaging/crons/jobs/gmail-fetch-messages-from-cache.cron.job';
7+
import { GmailMessagesImportCronJob } from 'src/modules/messaging/crons/jobs/gmail-messages-import.cron.job';
88

99
@Command({
10-
name: 'cron:messaging:gmail-fetch-messages-from-cache',
10+
name: 'cron:messaging:gmail-messages-import',
1111
description: 'Starts a cron job to fetch all messages from cache',
1212
})
13-
export class GmailFetchMessagesFromCacheCronCommand extends CommandRunner {
13+
export class GmailMessagesImportCronCommand extends CommandRunner {
1414
constructor(
1515
@Inject(MessageQueue.cronQueue)
1616
private readonly messageQueueService: MessageQueueService,
@@ -20,7 +20,7 @@ export class GmailFetchMessagesFromCacheCronCommand extends CommandRunner {
2020

2121
async run(): Promise<void> {
2222
await this.messageQueueService.addCron<undefined>(
23-
GmailFetchMessagesFromCacheCronJob.name,
23+
GmailMessagesImportCronJob.name,
2424
undefined,
2525
{
2626
repeat: {

packages/twenty-server/src/modules/messaging/crons/commands/messaging-cron-commands.module.ts

+3-6
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,14 @@ import { Module } from '@nestjs/common';
22

33
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
44
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
5-
import { GmailFetchMessagesFromCacheCronCommand } from 'src/modules/messaging/crons/commands/gmail-fetch-messages-from-cache.cron.command';
6-
import { GmailPartialSyncCronCommand } from 'src/modules/messaging/crons/commands/gmail-partial-sync.cron.command';
5+
import { GmailMessagesImportCronCommand } from 'src/modules/messaging/crons/commands/gmail-messages-import.cron.command';
6+
import { GmailMessageListFetchCronCommand } from 'src/modules/messaging/crons/commands/gmail-message-list-fetch.cron.command';
77
@Module({
88
imports: [
99
ObjectMetadataRepositoryModule.forFeature([
1010
ConnectedAccountWorkspaceEntity,
1111
]),
1212
],
13-
providers: [
14-
GmailPartialSyncCronCommand,
15-
GmailFetchMessagesFromCacheCronCommand,
16-
],
13+
providers: [GmailMessageListFetchCronCommand, GmailMessagesImportCronCommand],
1714
})
1815
export class MessagingCronCommandsModule {}

packages/twenty-server/src/modules/messaging/crons/jobs/gmail-partial-sync.cron.job.ts renamed to packages/twenty-server/src/modules/messaging/crons/jobs/gmail-message-list-fetch.cron.job.ts

+51-17
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,29 @@ import { MessageQueueJob } from 'src/engine/integrations/message-queue/interface
88
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
99
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity';
1010
import {
11-
GmailPartialSyncJobData,
12-
GmailPartialSyncJob,
13-
} from 'src/modules/messaging/jobs/gmail-partial-sync.job';
11+
GmailPartialMessageListFetchJobData,
12+
GmailPartialMessageListFetchJob,
13+
} from 'src/modules/messaging/jobs/gmail-partial-message-list-fetch.job';
1414
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
1515
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
1616
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
1717
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
1818
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
1919
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service';
20+
import {
21+
FeatureFlagEntity,
22+
FeatureFlagKeys,
23+
} from 'src/engine/core-modules/feature-flag/feature-flag.entity';
24+
import {
25+
GmailMessageListFetchJobData,
26+
GmailMessageListFetchJob,
27+
} from 'src/modules/messaging/jobs/gmail-message-list-fetch.job';
2028

2129
@Injectable()
22-
export class GmailPartialSyncCronJob implements MessageQueueJob<undefined> {
23-
private readonly logger = new Logger(GmailPartialSyncCronJob.name);
30+
export class GmailMessageListFetchCronJob
31+
implements MessageQueueJob<undefined>
32+
{
33+
private readonly logger = new Logger(GmailMessageListFetchCronJob.name);
2434

2535
constructor(
2636
@InjectRepository(Workspace, 'core')
@@ -32,6 +42,8 @@ export class GmailPartialSyncCronJob implements MessageQueueJob<undefined> {
3242
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
3343
private readonly messageChannelRepository: MessageChannelRepository,
3444
private readonly environmentService: EnvironmentService,
45+
@InjectRepository(FeatureFlagEntity, 'core')
46+
private readonly featureFlagRepository: Repository<FeatureFlagEntity>,
3547
) {}
3648

3749
async handle(): Promise<void> {
@@ -57,11 +69,20 @@ export class GmailPartialSyncCronJob implements MessageQueueJob<undefined> {
5769
);
5870

5971
for (const workspaceId of workspaceIdsWithDataSources) {
60-
await this.enqueuePartialSyncs(workspaceId);
72+
await this.enqueueSyncs(workspaceId);
6173
}
6274
}
6375

64-
private async enqueuePartialSyncs(workspaceId: string): Promise<void> {
76+
private async enqueueSyncs(workspaceId: string): Promise<void> {
77+
const isGmailSyncV2EnabledFeatureFlag =
78+
await this.featureFlagRepository.findOneBy({
79+
workspaceId: workspaceId,
80+
key: FeatureFlagKeys.IsGmailSyncV2Enabled,
81+
value: true,
82+
});
83+
84+
const isGmailSyncV2Enabled = isGmailSyncV2EnabledFeatureFlag?.value;
85+
6586
try {
6687
const messageChannels =
6788
await this.messageChannelRepository.getAll(workspaceId);
@@ -71,16 +92,29 @@ export class GmailPartialSyncCronJob implements MessageQueueJob<undefined> {
7192
continue;
7293
}
7394

74-
await this.messageQueueService.add<GmailPartialSyncJobData>(
75-
GmailPartialSyncJob.name,
76-
{
77-
workspaceId,
78-
connectedAccountId: messageChannel.connectedAccountId,
79-
},
80-
{
81-
retryLimit: 2,
82-
},
83-
);
95+
if (isGmailSyncV2Enabled) {
96+
await this.messageQueueService.add<GmailMessageListFetchJobData>(
97+
GmailMessageListFetchJob.name,
98+
{
99+
workspaceId,
100+
connectedAccountId: messageChannel.connectedAccountId,
101+
},
102+
{
103+
retryLimit: 2,
104+
},
105+
);
106+
} else {
107+
await this.messageQueueService.add<GmailPartialMessageListFetchJobData>(
108+
GmailPartialMessageListFetchJob.name,
109+
{
110+
workspaceId,
111+
connectedAccountId: messageChannel.connectedAccountId,
112+
},
113+
{
114+
retryLimit: 2,
115+
},
116+
);
117+
}
84118
}
85119
} catch (error) {
86120
this.logger.error(
+53-10
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Injectable } from '@nestjs/common';
1+
import { Injectable, Logger } from '@nestjs/common';
22
import { InjectRepository } from '@nestjs/typeorm';
33

44
import { Repository, In } from 'typeorm';
@@ -10,22 +10,34 @@ import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-s
1010
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
1111
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
1212
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
13-
import { GmailFetchMessageContentFromCacheService } from 'src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.service';
13+
import { GmailMessagesImportService } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import.service';
1414
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service';
15+
import { GmailMessagesImportV2Service } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import-v2.service';
16+
import {
17+
FeatureFlagEntity,
18+
FeatureFlagKeys,
19+
} from 'src/engine/core-modules/feature-flag/feature-flag.entity';
20+
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
21+
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
1522

1623
@Injectable()
17-
export class GmailFetchMessagesFromCacheCronJob
18-
implements MessageQueueJob<undefined>
19-
{
24+
export class GmailMessagesImportCronJob implements MessageQueueJob<undefined> {
25+
private readonly logger = new Logger(GmailMessagesImportCronJob.name);
26+
2027
constructor(
2128
@InjectRepository(Workspace, 'core')
2229
private readonly workspaceRepository: Repository<Workspace>,
2330
@InjectRepository(DataSourceEntity, 'metadata')
2431
private readonly dataSourceRepository: Repository<DataSourceEntity>,
2532
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
2633
private readonly messageChannelRepository: MessageChannelRepository,
27-
private readonly gmailFetchMessageContentFromCacheService: GmailFetchMessageContentFromCacheService,
34+
private readonly gmailFetchMessageContentFromCacheService: GmailMessagesImportService,
35+
private readonly gmailFetchMessageContentFromCacheV2Service: GmailMessagesImportV2Service,
36+
@InjectRepository(FeatureFlagEntity, 'core')
37+
private readonly featureFlagRepository: Repository<FeatureFlagEntity>,
2838
private readonly environmentService: EnvironmentService,
39+
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
40+
private readonly connectedAccountRepository: ConnectedAccountRepository,
2941
) {}
3042

3143
async handle(): Promise<void> {
@@ -59,11 +71,42 @@ export class GmailFetchMessagesFromCacheCronJob
5971
const messageChannels =
6072
await this.messageChannelRepository.getAll(workspaceId);
6173

74+
const isGmailSyncV2EnabledFeatureFlag =
75+
await this.featureFlagRepository.findOneBy({
76+
workspaceId: workspaceId,
77+
key: FeatureFlagKeys.IsGmailSyncV2Enabled,
78+
value: true,
79+
});
80+
81+
const isGmailSyncV2Enabled = isGmailSyncV2EnabledFeatureFlag?.value;
82+
6283
for (const messageChannel of messageChannels) {
63-
await this.gmailFetchMessageContentFromCacheService.fetchMessageContentFromCache(
64-
workspaceId,
65-
messageChannel.connectedAccountId,
66-
);
84+
if (!messageChannel?.isSyncEnabled) {
85+
continue;
86+
}
87+
88+
if (isGmailSyncV2Enabled) {
89+
try {
90+
const connectedAccount =
91+
await this.connectedAccountRepository.getConnectedAccountOrThrow(
92+
workspaceId,
93+
messageChannel.connectedAccountId,
94+
);
95+
96+
await this.gmailFetchMessageContentFromCacheV2Service.processMessageBatchImport(
97+
messageChannel,
98+
connectedAccount,
99+
workspaceId,
100+
);
101+
} catch (error) {
102+
this.logger.log(error.message);
103+
}
104+
} else {
105+
await this.gmailFetchMessageContentFromCacheService.fetchMessageContentFromCache(
106+
workspaceId,
107+
messageChannel.connectedAccountId,
108+
);
109+
}
67110
}
68111
}
69112
}

packages/twenty-server/src/modules/messaging/crons/jobs/messaging-cron-job.module.ts

+8-8
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,26 @@ import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-
55
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
66
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity';
77
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
8-
import { GmailFetchMessagesFromCacheCronJob } from 'src/modules/messaging/crons/jobs/gmail-fetch-messages-from-cache.cron.job';
9-
import { GmailPartialSyncCronJob } from 'src/modules/messaging/crons/jobs/gmail-partial-sync.cron.job';
10-
import { GmailFetchMessageContentFromCacheModule } from 'src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.module';
8+
import { GmailMessagesImportCronJob } from 'src/modules/messaging/crons/jobs/gmail-messages-import.cron.job';
9+
import { GmailMessageListFetchCronJob } from 'src/modules/messaging/crons/jobs/gmail-message-list-fetch.cron.job';
10+
import { GmailMessagesImportModule } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import.module';
1111
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
1212

1313
@Module({
1414
imports: [
1515
TypeOrmModule.forFeature([Workspace, FeatureFlagEntity], 'core'),
1616
TypeOrmModule.forFeature([DataSourceEntity], 'metadata'),
1717
ObjectMetadataRepositoryModule.forFeature([MessageChannelWorkspaceEntity]),
18-
GmailFetchMessageContentFromCacheModule,
18+
GmailMessagesImportModule,
1919
],
2020
providers: [
2121
{
22-
provide: GmailFetchMessagesFromCacheCronJob.name,
23-
useClass: GmailFetchMessagesFromCacheCronJob,
22+
provide: GmailMessagesImportCronJob.name,
23+
useClass: GmailMessagesImportCronJob,
2424
},
2525
{
26-
provide: GmailPartialSyncCronJob.name,
27-
useClass: GmailPartialSyncCronJob,
26+
provide: GmailMessageListFetchCronJob.name,
27+
useClass: GmailMessageListFetchCronJob,
2828
},
2929
],
3030
})

0 commit comments

Comments
 (0)