Skip to content

Commit

Permalink
5507 modify the partial sync cron to work with the new statuses (#5512)
Browse files Browse the repository at this point in the history
Closes #5507
  • Loading branch information
bosiraphael authored and Weiko committed May 31, 2024
1 parent 6309c2c commit 26a9598
Show file tree
Hide file tree
Showing 31 changed files with 1,185 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ import {
MessageChannelVisibility,
} from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
import {
GmailFullSyncJobData,
GmailFullSyncJob,
} from 'src/modules/messaging/jobs/gmail-full-sync.job';
GmailFullMessageListFetchJobData,
GmailFullMessageListFetchJob,
} from 'src/modules/messaging/jobs/gmail-full-message-list-fetch.job';

@Injectable()
export class GoogleAPIsService {
Expand Down Expand Up @@ -156,8 +156,8 @@ export class GoogleAPIsService {
isCalendarEnabled: boolean,
) {
if (this.environmentService.get('MESSAGING_PROVIDER_GMAIL_ENABLED')) {
await this.messageQueueService.add<GmailFullSyncJobData>(
GmailFullSyncJob.name,
await this.messageQueueService.add<GmailFullMessageListFetchJobData>(
GmailFullMessageListFetchJob.name,
{
workspaceId,
connectedAccountId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import { DataSourceModule } from 'src/engine/metadata-modules/data-source/data-s
import { ObjectMetadataModule } from 'src/engine/metadata-modules/object-metadata/object-metadata.module';
import { CleanInactiveWorkspaceJob } from 'src/engine/workspace-manager/workspace-cleaner/crons/clean-inactive-workspace.job';
import { CalendarEventParticipantModule } from 'src/modules/calendar/services/calendar-event-participant/calendar-event-participant.module';
import { GmailFetchMessageContentFromCacheModule } from 'src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.module';
import { GmailFullSyncModule } from 'src/modules/messaging/services/gmail-full-sync/gmail-full-sync.module';
import { GmailPartialSyncModule } from 'src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.module';
import { GmailMessagesImportModule } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import.module';
import { GmailFullMessageListFetchModule } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.module';
import { GmailPartialMessageListFetchModule } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.module';
import { TimelineActivityModule } from 'src/modules/timeline/timeline-activity.module';
import { WorkspaceModule } from 'src/engine/core-modules/workspace/workspace.module';
import { CalendarMessagingParticipantJobModule } from 'src/modules/calendar-messaging-participant/jobs/calendar-messaging-participant-job.module';
Expand All @@ -41,9 +41,9 @@ import { TimelineJobModule } from 'src/modules/timeline/jobs/timeline-job.module
BillingModule,
UserWorkspaceModule,
WorkspaceModule,
GmailFullSyncModule,
GmailFetchMessageContentFromCacheModule,
GmailPartialSyncModule,
GmailFullMessageListFetchModule,
GmailMessagesImportModule,
GmailPartialMessageListFetchModule,
CalendarEventParticipantModule,
TimelineActivityModule,
StripeModule,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,4 +254,30 @@ export class ConnectedAccountRepository {
transactionManager,
);
}

public async getConnectedAccountOrThrow(
workspaceId: string,
connectedAccountId: string,
): Promise<ObjectRecord<ConnectedAccountWorkspaceEntity>> {
const connectedAccount = await this.getById(
connectedAccountId,
workspaceId,
);

if (!connectedAccount) {
throw new Error(
`Connected account ${connectedAccountId} not found in workspace ${workspaceId}`,
);
}

const refreshToken = connectedAccount.refreshToken;

if (!refreshToken) {
throw new Error(
`No refresh token found for connected account ${connectedAccountId} in workspace ${workspaceId}`,
);
}

return connectedAccount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ import { Command, CommandRunner } from 'nest-commander';

import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { GmailPartialSyncCronJob } from 'src/modules/messaging/crons/jobs/gmail-partial-sync.cron.job';
import { GmailMessageListFetchCronJob } from 'src/modules/messaging/crons/jobs/gmail-message-list-fetch.cron.job';

const GMAIL_PARTIAL_SYNC_CRON_PATTERN = '*/5 * * * *';

@Command({
name: 'cron:messaging:gmail-partial-sync',
name: 'cron:messaging:gmail-message-list-fetch',
description:
'Starts a cron job to sync existing connected account messages and store them in the cache',
})
export class GmailPartialSyncCronCommand extends CommandRunner {
export class GmailMessageListFetchCronCommand extends CommandRunner {
constructor(
@Inject(MessageQueue.cronQueue)
private readonly messageQueueService: MessageQueueService,
Expand All @@ -23,7 +23,7 @@ export class GmailPartialSyncCronCommand extends CommandRunner {

async run(): Promise<void> {
await this.messageQueueService.addCron<undefined>(
GmailPartialSyncCronJob.name,
GmailMessageListFetchCronJob.name,
undefined,
{
repeat: { pattern: GMAIL_PARTIAL_SYNC_CRON_PATTERN },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import { Command, CommandRunner } from 'nest-commander';

import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { GmailFetchMessagesFromCacheCronJob } from 'src/modules/messaging/crons/jobs/gmail-fetch-messages-from-cache.cron.job';
import { GmailMessagesImportCronJob } from 'src/modules/messaging/crons/jobs/gmail-messages-import.cron.job';

@Command({
name: 'cron:messaging:gmail-fetch-messages-from-cache',
name: 'cron:messaging:gmail-messages-import',
description: 'Starts a cron job to fetch all messages from cache',
})
export class GmailFetchMessagesFromCacheCronCommand extends CommandRunner {
export class GmailMessagesImportCronCommand extends CommandRunner {
constructor(
@Inject(MessageQueue.cronQueue)
private readonly messageQueueService: MessageQueueService,
Expand All @@ -20,7 +20,7 @@ export class GmailFetchMessagesFromCacheCronCommand extends CommandRunner {

async run(): Promise<void> {
await this.messageQueueService.addCron<undefined>(
GmailFetchMessagesFromCacheCronJob.name,
GmailMessagesImportCronJob.name,
undefined,
{
repeat: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,14 @@ import { Module } from '@nestjs/common';

import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { GmailFetchMessagesFromCacheCronCommand } from 'src/modules/messaging/crons/commands/gmail-fetch-messages-from-cache.cron.command';
import { GmailPartialSyncCronCommand } from 'src/modules/messaging/crons/commands/gmail-partial-sync.cron.command';
import { GmailMessagesImportCronCommand } from 'src/modules/messaging/crons/commands/gmail-messages-import.cron.command';
import { GmailMessageListFetchCronCommand } from 'src/modules/messaging/crons/commands/gmail-message-list-fetch.cron.command';
@Module({
imports: [
ObjectMetadataRepositoryModule.forFeature([
ConnectedAccountWorkspaceEntity,
]),
],
providers: [
GmailPartialSyncCronCommand,
GmailFetchMessagesFromCacheCronCommand,
],
providers: [GmailMessageListFetchCronCommand, GmailMessagesImportCronCommand],
})
export class MessagingCronCommandsModule {}
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,29 @@ import { MessageQueueJob } from 'src/engine/integrations/message-queue/interface
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity';
import {
GmailPartialSyncJobData,
GmailPartialSyncJob,
} from 'src/modules/messaging/jobs/gmail-partial-sync.job';
GmailPartialMessageListFetchJobData,
GmailPartialMessageListFetchJob,
} from 'src/modules/messaging/jobs/gmail-partial-message-list-fetch.job';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service';
import {
FeatureFlagEntity,
FeatureFlagKeys,
} from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import {
GmailMessageListFetchJobData,
GmailMessageListFetchJob,
} from 'src/modules/messaging/jobs/gmail-message-list-fetch.job';

@Injectable()
export class GmailPartialSyncCronJob implements MessageQueueJob<undefined> {
private readonly logger = new Logger(GmailPartialSyncCronJob.name);
export class GmailMessageListFetchCronJob
implements MessageQueueJob<undefined>
{
private readonly logger = new Logger(GmailMessageListFetchCronJob.name);

constructor(
@InjectRepository(Workspace, 'core')
Expand All @@ -32,6 +42,8 @@ export class GmailPartialSyncCronJob implements MessageQueueJob<undefined> {
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
private readonly environmentService: EnvironmentService,
@InjectRepository(FeatureFlagEntity, 'core')
private readonly featureFlagRepository: Repository<FeatureFlagEntity>,
) {}

async handle(): Promise<void> {
Expand All @@ -57,11 +69,20 @@ export class GmailPartialSyncCronJob implements MessageQueueJob<undefined> {
);

for (const workspaceId of workspaceIdsWithDataSources) {
await this.enqueuePartialSyncs(workspaceId);
await this.enqueueSyncs(workspaceId);
}
}

private async enqueuePartialSyncs(workspaceId: string): Promise<void> {
private async enqueueSyncs(workspaceId: string): Promise<void> {
const isGmailSyncV2EnabledFeatureFlag =
await this.featureFlagRepository.findOneBy({
workspaceId: workspaceId,
key: FeatureFlagKeys.IsGmailSyncV2Enabled,
value: true,
});

const isGmailSyncV2Enabled = isGmailSyncV2EnabledFeatureFlag?.value;

try {
const messageChannels =
await this.messageChannelRepository.getAll(workspaceId);
Expand All @@ -71,16 +92,29 @@ export class GmailPartialSyncCronJob implements MessageQueueJob<undefined> {
continue;
}

await this.messageQueueService.add<GmailPartialSyncJobData>(
GmailPartialSyncJob.name,
{
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
},
{
retryLimit: 2,
},
);
if (isGmailSyncV2Enabled) {
await this.messageQueueService.add<GmailMessageListFetchJobData>(
GmailMessageListFetchJob.name,
{
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
},
{
retryLimit: 2,
},
);
} else {
await this.messageQueueService.add<GmailPartialMessageListFetchJobData>(
GmailPartialMessageListFetchJob.name,
{
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
},
{
retryLimit: 2,
},
);
}
}
} catch (error) {
this.logger.error(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Injectable } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';

import { Repository, In } from 'typeorm';
Expand All @@ -10,22 +10,34 @@ import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-s
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
import { GmailFetchMessageContentFromCacheService } from 'src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.service';
import { GmailMessagesImportService } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import.service';
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service';
import { GmailMessagesImportV2Service } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import-v2.service';
import {
FeatureFlagEntity,
FeatureFlagKeys,
} from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';

@Injectable()
export class GmailFetchMessagesFromCacheCronJob
implements MessageQueueJob<undefined>
{
export class GmailMessagesImportCronJob implements MessageQueueJob<undefined> {
private readonly logger = new Logger(GmailMessagesImportCronJob.name);

constructor(
@InjectRepository(Workspace, 'core')
private readonly workspaceRepository: Repository<Workspace>,
@InjectRepository(DataSourceEntity, 'metadata')
private readonly dataSourceRepository: Repository<DataSourceEntity>,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
private readonly gmailFetchMessageContentFromCacheService: GmailFetchMessageContentFromCacheService,
private readonly gmailFetchMessageContentFromCacheService: GmailMessagesImportService,
private readonly gmailFetchMessageContentFromCacheV2Service: GmailMessagesImportV2Service,
@InjectRepository(FeatureFlagEntity, 'core')
private readonly featureFlagRepository: Repository<FeatureFlagEntity>,
private readonly environmentService: EnvironmentService,
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
) {}

async handle(): Promise<void> {
Expand Down Expand Up @@ -59,11 +71,42 @@ export class GmailFetchMessagesFromCacheCronJob
const messageChannels =
await this.messageChannelRepository.getAll(workspaceId);

const isGmailSyncV2EnabledFeatureFlag =
await this.featureFlagRepository.findOneBy({
workspaceId: workspaceId,
key: FeatureFlagKeys.IsGmailSyncV2Enabled,
value: true,
});

const isGmailSyncV2Enabled = isGmailSyncV2EnabledFeatureFlag?.value;

for (const messageChannel of messageChannels) {
await this.gmailFetchMessageContentFromCacheService.fetchMessageContentFromCache(
workspaceId,
messageChannel.connectedAccountId,
);
if (!messageChannel?.isSyncEnabled) {
continue;
}

if (isGmailSyncV2Enabled) {
try {
const connectedAccount =
await this.connectedAccountRepository.getConnectedAccountOrThrow(
workspaceId,
messageChannel.connectedAccountId,
);

await this.gmailFetchMessageContentFromCacheV2Service.processMessageBatchImport(
messageChannel,
connectedAccount,
workspaceId,
);
} catch (error) {
this.logger.log(error.message);
}
} else {
await this.gmailFetchMessageContentFromCacheService.fetchMessageContentFromCache(
workspaceId,
messageChannel.connectedAccountId,
);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,26 @@ import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity';
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
import { GmailFetchMessagesFromCacheCronJob } from 'src/modules/messaging/crons/jobs/gmail-fetch-messages-from-cache.cron.job';
import { GmailPartialSyncCronJob } from 'src/modules/messaging/crons/jobs/gmail-partial-sync.cron.job';
import { GmailFetchMessageContentFromCacheModule } from 'src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.module';
import { GmailMessagesImportCronJob } from 'src/modules/messaging/crons/jobs/gmail-messages-import.cron.job';
import { GmailMessageListFetchCronJob } from 'src/modules/messaging/crons/jobs/gmail-message-list-fetch.cron.job';
import { GmailMessagesImportModule } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import.module';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';

@Module({
imports: [
TypeOrmModule.forFeature([Workspace, FeatureFlagEntity], 'core'),
TypeOrmModule.forFeature([DataSourceEntity], 'metadata'),
ObjectMetadataRepositoryModule.forFeature([MessageChannelWorkspaceEntity]),
GmailFetchMessageContentFromCacheModule,
GmailMessagesImportModule,
],
providers: [
{
provide: GmailFetchMessagesFromCacheCronJob.name,
useClass: GmailFetchMessagesFromCacheCronJob,
provide: GmailMessagesImportCronJob.name,
useClass: GmailMessagesImportCronJob,
},
{
provide: GmailPartialSyncCronJob.name,
useClass: GmailPartialSyncCronJob,
provide: GmailMessageListFetchCronJob.name,
useClass: GmailMessageListFetchCronJob,
},
],
})
Expand Down
Loading

0 comments on commit 26a9598

Please sign in to comment.