Skip to content

Commit

Permalink
5898 Create a cron to monitor messageChannelSyncStatus (#5933)
Browse files Browse the repository at this point in the history
Closes #5898
  • Loading branch information
bosiraphael authored Jun 19, 2024
1 parent 016132e commit 86f95c0
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { EnvironmentService } from 'src/engine/integrations/environment/environm

type MessagingTelemetryTrackInput = {
eventName: string;
workspaceId: string;
workspaceId?: string;
userId?: string;
connectedAccountId?: string;
messageChannelId?: string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import { MessagingBlocklistManagerModule } from 'src/modules/messaging/blocklist
import { MessagingMessageCleanerModule } from 'src/modules/messaging/message-cleaner/messaging-message-cleaner.module';
import { MessagingImportManagerModule } from 'src/modules/messaging/message-import-manager/messaging-import-manager.module';
import { MessaginParticipantsManagerModule } from 'src/modules/messaging/message-participants-manager/messaging-participants-manager.module';
import { MessagingMonitoringModule } from 'src/modules/messaging/monitoring/messaging-monitoring.module';

@Module({
imports: [
MessagingImportManagerModule,
MessagingMessageCleanerModule,
MessaginParticipantsManagerModule,
MessagingBlocklistManagerModule,
MessagingMonitoringModule,
],
providers: [],
exports: [],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { Command, CommandRunner } from 'nest-commander';

import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { MessagingMessageChannelSyncStatusMonitoringCronJob } from 'src/modules/messaging/monitoring/crons/jobs/messaging-message-channel-sync-status-monitoring.cron';

const MESSAGING_MESSAGE_CHANNEL_SYNC_STATUS_MONITORING_CRON_PATTERN =
'0 * * * *';

@Command({
name: 'cron:messaging:monitoring:message-channel-sync-status',
description:
'Starts a cron job to monitor the sync status of message channels',
})
export class MessagingMessageChannelSyncStatusMonitoringCronCommand extends CommandRunner {
constructor(
@InjectMessageQueue(MessageQueue.cronQueue)
private readonly messageQueueService: MessageQueueService,
) {
super();
}

async run(): Promise<void> {
await this.messageQueueService.addCron<undefined>(
MessagingMessageChannelSyncStatusMonitoringCronJob.name,
undefined,
{
repeat: {
pattern:
MESSAGING_MESSAGE_CHANNEL_SYNC_STATUS_MONITORING_CRON_PATTERN,
},
},
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import { Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';

import { Repository, In } from 'typeorm';
import snakeCase from 'lodash.snakecase';

import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { MessagingTelemetryService } from 'src/modules/messaging/common/services/messaging-telemetry.service';

@Processor(MessageQueue.cronQueue)
export class MessagingMessageChannelSyncStatusMonitoringCronJob {
private readonly logger = new Logger(
MessagingMessageChannelSyncStatusMonitoringCronJob.name,
);

constructor(
@InjectRepository(Workspace, 'core')
private readonly workspaceRepository: Repository<Workspace>,
@InjectRepository(DataSourceEntity, 'metadata')
private readonly dataSourceRepository: Repository<DataSourceEntity>,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
private readonly environmentService: EnvironmentService,
private readonly messagingTelemetryService: MessagingTelemetryService,
) {}

@Process(MessagingMessageChannelSyncStatusMonitoringCronJob.name)
async handle(): Promise<void> {
this.logger.log('Starting message channel sync status monitoring...');

await this.messagingTelemetryService.track({
eventName: 'message_channel.monitoring.sync_status.start',
message: 'Starting message channel sync status monitoring',
});

const workspaceIds = (
await this.workspaceRepository.find({
where: this.environmentService.get('IS_BILLING_ENABLED')
? {
subscriptionStatus: In(['active', 'trialing', 'past_due']),
}
: {},
select: ['id'],
})
).map((workspace) => workspace.id);

const dataSources = await this.dataSourceRepository.find({
where: {
workspaceId: In(workspaceIds),
},
});

const workspaceIdsWithDataSources = new Set(
dataSources.map((dataSource) => dataSource.workspaceId),
);

for (const workspaceId of workspaceIdsWithDataSources) {
const messageChannels =
await this.messageChannelRepository.getAll(workspaceId);

for (const messageChannel of messageChannels) {
await this.messagingTelemetryService.track({
eventName: `message_channel.monitoring.sync_status.${snakeCase(
messageChannel.syncStatus,
)}`,
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
message: messageChannel.syncStatus,
});
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';

import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity';
import { MessagingCommonModule } from 'src/modules/messaging/common/messaging-common.module';
import { MessagingMessageChannelSyncStatusMonitoringCronCommand } from 'src/modules/messaging/monitoring/crons/commands/messaging-message-channel-sync-status-monitoring.cron.command';
import { MessagingMessageChannelSyncStatusMonitoringCronJob } from 'src/modules/messaging/monitoring/crons/jobs/messaging-message-channel-sync-status-monitoring.cron';

@Module({
imports: [
MessagingCommonModule,
TypeOrmModule.forFeature([Workspace], 'core'),
TypeOrmModule.forFeature([DataSourceEntity], 'metadata'),
],
providers: [
MessagingMessageChannelSyncStatusMonitoringCronCommand,
MessagingMessageChannelSyncStatusMonitoringCronJob,
],
exports: [],
})
export class MessagingMonitoringModule {}

0 comments on commit 86f95c0

Please sign in to comment.