-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[messaging] add cronjob for workspaces messages partial sync (#3800)
* [messaging] add cronjob for workspaces messages partial sync * run cron every 10 minutes * use logger
- Loading branch information
Showing
11 changed files
with
163 additions
and
38 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,27 +1,9 @@ | ||
import { Module } from '@nestjs/common'; | ||
|
||
import { EnvironmentModule } from 'src/integrations/environment/environment.module'; | ||
import { EnvironmentService } from 'src/integrations/environment/environment.service'; | ||
import { LoggerModule } from 'src/integrations/logger/logger.module'; | ||
import { loggerModuleFactory } from 'src/integrations/logger/logger.module-factory'; | ||
import { JobsModule } from 'src/integrations/message-queue/jobs.module'; | ||
import { MessageQueueModule } from 'src/integrations/message-queue/message-queue.module'; | ||
import { messageQueueModuleFactory } from 'src/integrations/message-queue/message-queue.module-factory'; | ||
import { IntegrationsModule } from 'src/integrations/integrations.module'; | ||
|
||
@Module({ | ||
imports: [ | ||
EnvironmentModule.forRoot({}), | ||
LoggerModule.forRootAsync({ | ||
useFactory: loggerModuleFactory, | ||
inject: [EnvironmentService], | ||
}), | ||
MessageQueueModule.forRoot({ | ||
useFactory: messageQueueModuleFactory, | ||
inject: [EnvironmentService], | ||
}), | ||
JobsModule, | ||
IntegrationsModule, | ||
], | ||
imports: [IntegrationsModule, JobsModule], | ||
}) | ||
export class QueueWorkerModule {} |
29 changes: 29 additions & 0 deletions
29
...etch-all-workspaces-messages/commands/start-fetch-all-workspaces-messages.cron.command.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
import { Inject } from '@nestjs/common'; | ||
|
||
import { Command, CommandRunner } from 'nest-commander'; | ||
|
||
import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants'; | ||
import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; | ||
import { fetchAllWorkspacesMessagesCronPattern } from 'src/workspace/cron/fetch-all-workspaces-messages/fetch-all-workspaces-messages.cron.pattern'; | ||
import { FetchAllWorkspacesMessagesJob } from 'src/workspace/cron/fetch-all-workspaces-messages/fetch-all-workspaces-messages.job'; | ||
|
||
@Command({ | ||
name: 'fetch-all-workspaces-messages:cron:start', | ||
description: 'Starts a cron job to fetch all workspaces messages', | ||
}) | ||
export class StartFetchAllWorkspacesMessagesCronCommand extends CommandRunner { | ||
constructor( | ||
@Inject(MessageQueue.cronQueue) | ||
private readonly messageQueueService: MessageQueueService, | ||
) { | ||
super(); | ||
} | ||
|
||
async run(): Promise<void> { | ||
await this.messageQueueService.addCron<undefined>( | ||
FetchAllWorkspacesMessagesJob.name, | ||
undefined, | ||
fetchAllWorkspacesMessagesCronPattern, | ||
); | ||
} | ||
} |
28 changes: 28 additions & 0 deletions
28
...fetch-all-workspaces-messages/commands/stop-fetch-all-workspaces-messages.cron.command.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
import { Inject } from '@nestjs/common'; | ||
|
||
import { Command, CommandRunner } from 'nest-commander'; | ||
|
||
import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants'; | ||
import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; | ||
import { fetchAllWorkspacesMessagesCronPattern } from 'src/workspace/cron/fetch-all-workspaces-messages/fetch-all-workspaces-messages.cron.pattern'; | ||
import { FetchAllWorkspacesMessagesJob } from 'src/workspace/cron/fetch-all-workspaces-messages/fetch-all-workspaces-messages.job'; | ||
|
||
@Command({ | ||
name: 'fetch-all-workspaces-messages:cron:stop', | ||
description: 'Stops the fetch all workspaces messages cron job', | ||
}) | ||
export class StopFetchAllWorkspacesMessagesCronCommand extends CommandRunner { | ||
constructor( | ||
@Inject(MessageQueue.cronQueue) | ||
private readonly messageQueueService: MessageQueueService, | ||
) { | ||
super(); | ||
} | ||
|
||
async run(): Promise<void> { | ||
await this.messageQueueService.removeCron( | ||
FetchAllWorkspacesMessagesJob.name, | ||
fetchAllWorkspacesMessagesCronPattern, | ||
); | ||
} | ||
} |
1 change: 1 addition & 0 deletions
1
...orkspace/cron/fetch-all-workspaces-messages/fetch-all-workspaces-messages.cron.pattern.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
export const fetchAllWorkspacesMessagesCronPattern = '*/10 * * * *'; |
66 changes: 66 additions & 0 deletions
66
...ver/src/workspace/cron/fetch-all-workspaces-messages/fetch-all-workspaces-messages.job.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
import { Inject, Injectable } from '@nestjs/common'; | ||
import { InjectRepository } from '@nestjs/typeorm'; | ||
|
||
import { Repository } from 'typeorm'; | ||
|
||
import { MessageQueueJob } from 'src/integrations/message-queue/interfaces/message-queue-job.interface'; | ||
|
||
import { | ||
FeatureFlagEntity, | ||
FeatureFlagKeys, | ||
} from 'src/core/feature-flag/feature-flag.entity'; | ||
import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants'; | ||
import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; | ||
import { ConnectedAccountService } from 'src/workspace/messaging/connected-account/connected-account.service'; | ||
import { | ||
GmailPartialSyncJobData, | ||
GmailPartialSyncJob, | ||
} from 'src/workspace/messaging/jobs/gmail-partial-sync.job'; | ||
|
||
@Injectable() | ||
export class FetchAllWorkspacesMessagesJob | ||
implements MessageQueueJob<undefined> | ||
{ | ||
constructor( | ||
@InjectRepository(FeatureFlagEntity, 'core') | ||
private readonly featureFlagRepository: Repository<FeatureFlagEntity>, | ||
@Inject(MessageQueue.messagingQueue) | ||
private readonly messageQueueService: MessageQueueService, | ||
private readonly connectedAccountService: ConnectedAccountService, | ||
) {} | ||
|
||
async handle(): Promise<void> { | ||
const featureFlagsWithMessagingEnabled = | ||
await this.featureFlagRepository.findBy({ | ||
key: FeatureFlagKeys.IsMessagingEnabled, | ||
value: true, | ||
}); | ||
|
||
const workspaceIds = featureFlagsWithMessagingEnabled.map( | ||
(featureFlag) => featureFlag.workspaceId, | ||
); | ||
|
||
for (const workspaceId of workspaceIds) { | ||
await this.fetchWorkspaceMessages(workspaceId); | ||
} | ||
} | ||
|
||
private async fetchWorkspaceMessages(workspaceId: string): Promise<void> { | ||
const connectedAccounts = | ||
await this.connectedAccountService.getAll(workspaceId); | ||
|
||
for (const connectedAccount of connectedAccounts) { | ||
await this.messageQueueService.add<GmailPartialSyncJobData>( | ||
GmailPartialSyncJob.name, | ||
{ | ||
workspaceId, | ||
connectedAccountId: connectedAccount.id, | ||
}, | ||
{ | ||
id: `${workspaceId}-${connectedAccount.id}`, | ||
retryLimit: 2, | ||
}, | ||
); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters