Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

5615 create messageongoingstalecron #6005

Merged
merged 14 commits into from
Jun 25, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { Command, CommandRunner } from 'nest-commander';

import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { MessagingOngoingStaleCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-ongoing-stale.cron.job';

const MESSAGING_ONGOING_STALE_CRON_PATTERN = '0 * * * *';

@Command({
name: 'cron:messaging:ongoing-stale',
description:
'Starts a cron job to check for stale ongoing message imports and put them back to pending',
})
export class MessagingOngoingStaleCronCommand extends CommandRunner {
constructor(
@InjectMessageQueue(MessageQueue.cronQueue)
private readonly messageQueueService: MessageQueueService,
) {
super();
}

async run(): Promise<void> {
await this.messageQueueService.addCron<undefined>(
MessagingOngoingStaleCronJob.name,
undefined,
{
repeat: { pattern: MESSAGING_ONGOING_STALE_CRON_PATTERN },
},
);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Inject, Injectable, Logger } from '@nestjs/common';
import { Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';

import { Repository, In } from 'typeorm';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { InjectRepository } from '@nestjs/typeorm';

import { Repository, In } from 'typeorm';

import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity';
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import {
MessagingOngoingStaleJobData,
MessagingOngoingStaleJob,
} from 'src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job';

@Processor(MessageQueue.cronQueue)
export class MessagingOngoingStaleCronJob {
constructor(
@InjectRepository(Workspace, 'core')
private readonly workspaceRepository: Repository<Workspace>,
@InjectRepository(DataSourceEntity, 'metadata')
private readonly dataSourceRepository: Repository<DataSourceEntity>,
private readonly environmentService: EnvironmentService,
@InjectMessageQueue(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
) {}

@Process(MessagingOngoingStaleCronJob.name)
async handle(): Promise<void> {
const workspaceIds = (
await this.workspaceRepository.find({
where: this.environmentService.get('IS_BILLING_ENABLED')
? {
subscriptionStatus: In(['active', 'trialing', 'past_due']),
}
: {},
select: ['id'],
})
).map((workspace) => workspace.id);

const dataSources = await this.dataSourceRepository.find({
where: {
workspaceId: In(workspaceIds),
},
});

const workspaceIdsWithDataSources = new Set(
dataSources.map((dataSource) => dataSource.workspaceId),
);

for (const workspaceId of workspaceIdsWithDataSources) {
await this.messageQueueService.add<MessagingOngoingStaleJobData>(
MessagingOngoingStaleJob.name,
{
workspaceId,
},
);
}
bosiraphael marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { Logger, Scope } from '@nestjs/common';

import { In } from 'typeorm';

import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import {
MessageChannelSyncStage,
MessageChannelWorkspaceEntity,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { isSyncStale } from 'src/modules/messaging/message-import-manager/utils/is-sync-stale.util';
import { WorkspaceRepository } from 'src/engine/twenty-orm/repository/workspace.repository';
import { InjectWorkspaceRepository } from 'src/engine/twenty-orm/decorators/inject-workspace-repository.decorator';
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';

export type MessagingOngoingStaleJobData = {
workspaceId: string;
};

@Processor({
queueName: MessageQueue.messagingQueue,
scope: Scope.REQUEST,
})
export class MessagingOngoingStaleJob {
private readonly logger = new Logger(MessagingOngoingStaleJob.name);
constructor(
@InjectWorkspaceRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: WorkspaceRepository<MessageChannelWorkspaceEntity>,
private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService,
) {}

@Process(MessagingOngoingStaleJob.name)
async handle(data: MessagingOngoingStaleJobData): Promise<void> {
const { workspaceId } = data;

const messageChannels = await this.messageChannelRepository.find({
where: {
syncStage: In([
MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING,
MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING,
]),
},
});

for (const messageChannel of messageChannels) {
if (
messageChannel.syncStageStartedAt &&
isSyncStale(messageChannel.syncStageStartedAt)
) {
this.logger.log(
`Sync for message channel ${messageChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to MESSAGES_IMPORT_PENDING`,
);

switch (messageChannel.syncStage) {
case MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING:
await this.messagingChannelSyncStatusService.schedulePartialMessageListFetch(
messageChannel.id,
workspaceId,
);
break;
case MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING:
await this.messagingChannelSyncStatusService.scheduleMessagesImport(
messageChannel.id,
workspaceId,
);
break;
default:
break;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,41 @@ import { TypeOrmModule } from '@nestjs/typeorm';

import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity';
import { TwentyORMModule } from 'src/engine/twenty-orm/twenty-orm.module';
import { MessagingCommonModule } from 'src/modules/messaging/common/messaging-common.module';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessagingSingleMessageImportCommand } from 'src/modules/messaging/message-import-manager/commands/messaging-single-message-import.command';
import { MessagingMessageListFetchCronCommand } from 'src/modules/messaging/message-import-manager/crons/commands/messaging-message-list-fetch.cron.command';
import { MessagingMessagesImportCronCommand } from 'src/modules/messaging/message-import-manager/crons/commands/messaging-messages-import.cron.command';
import { MessagingOngoingStaleCronCommand } from 'src/modules/messaging/message-import-manager/crons/commands/messaging-ongoing-stale.cron.command';
import { MessagingMessageListFetchCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job';
import { MessagingMessagesImportCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job';
import { MessagingOngoingStaleCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-ongoing-stale.cron.job';
import { MessagingGmailDriverModule } from 'src/modules/messaging/message-import-manager/drivers/gmail/messaging-gmail-driver.module';
import { MessagingAddSingleMessageToCacheForImportJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-add-single-message-to-cache-for-import.job';
import { MessagingMessageListFetchJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job';
import { MessagingMessagesImportJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job';
import { MessagingOngoingStaleJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job';

@Module({
imports: [
MessagingGmailDriverModule,
MessagingCommonModule,
TypeOrmModule.forFeature([Workspace], 'core'),
TypeOrmModule.forFeature([DataSourceEntity], 'metadata'),
TwentyORMModule.forFeature([MessageChannelWorkspaceEntity]),
],
providers: [
MessagingMessageListFetchCronCommand,
MessagingMessagesImportCronCommand,
MessagingOngoingStaleCronCommand,
MessagingSingleMessageImportCommand,
MessagingMessageListFetchJob,
MessagingMessagesImportJob,
MessagingOngoingStaleJob,
MessagingMessageListFetchCronJob,
MessagingMessagesImportCronJob,
MessagingOngoingStaleCronJob,
MessagingAddSingleMessageToCacheForImportJob,
],
exports: [],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { MESSAGING_IMPORT_ONGOING_SYNC_TIMEOUT } from 'src/modules/messaging/message-import-manager/constants/messaging-import-ongoing-sync-timeout.constant';
import { isSyncStale } from 'src/modules/messaging/message-import-manager/utils/is-sync-stale.util';

jest.useFakeTimers().setSystemTime(new Date('2024-01-01'));

describe('isSyncStale', () => {
it('should return true if sync is stale', () => {
const syncStageStartedAt = new Date(
Date.now() - MESSAGING_IMPORT_ONGOING_SYNC_TIMEOUT - 1,
bosiraphael marked this conversation as resolved.
Show resolved Hide resolved
).toISOString();

const result = isSyncStale(syncStageStartedAt);

expect(result).toBe(true);
});

it('should return false if sync is not stale', () => {
const syncStageStartedAt = new Date(
Date.now() - MESSAGING_IMPORT_ONGOING_SYNC_TIMEOUT + 1,
).toISOString();

const result = isSyncStale(syncStageStartedAt);

expect(result).toBe(false);
});

it('should return false if syncStageStartedAt is invalid', () => {
const syncStageStartedAt = 'invalid-date';

const result = isSyncStale(syncStageStartedAt);

expect(result).toBe(false);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { MESSAGING_IMPORT_ONGOING_SYNC_TIMEOUT } from 'src/modules/messaging/message-import-manager/constants/messaging-import-ongoing-sync-timeout.constant';

export const isSyncStale = (syncStageStartedAt: string): boolean => {
try {
const syncStageStartedTime = new Date(syncStageStartedAt).getTime();

if (isNaN(syncStageStartedTime)) {
throw new Error('Invalid date format');
}

return (
Date.now() - syncStageStartedTime > MESSAGING_IMPORT_ONGOING_SYNC_TIMEOUT
);
} catch (error) {
return false;
bosiraphael marked this conversation as resolved.
Show resolved Hide resolved
}
};
Loading