diff --git a/packages/twenty-server/src/modules/connected-account/query-hooks/connected-account-query-hook.module.ts b/packages/twenty-server/src/modules/connected-account/query-hooks/connected-account-query-hook.module.ts index d6fb5b434735..e98d7dba83c4 100644 --- a/packages/twenty-server/src/modules/connected-account/query-hooks/connected-account-query-hook.module.ts +++ b/packages/twenty-server/src/modules/connected-account/query-hooks/connected-account-query-hook.module.ts @@ -1,16 +1,23 @@ import { Module } from '@nestjs/common'; +import { TwentyORMModule } from 'src/engine/twenty-orm/twenty-orm.module'; import { BlocklistCreateManyPreQueryHook } from 'src/modules/connected-account/query-hooks/blocklist/blocklist-create-many.pre-query.hook'; import { BlocklistUpdateManyPreQueryHook } from 'src/modules/connected-account/query-hooks/blocklist/blocklist-update-many.pre-query.hook'; import { BlocklistUpdateOnePreQueryHook } from 'src/modules/connected-account/query-hooks/blocklist/blocklist-update-one.pre-query.hook'; +import { ConnectedAccountDeleteOnePreQueryHook } from 'src/modules/connected-account/query-hooks/connected-account/connected-account-delete-one.pre-query.hook'; import { BlocklistValidationModule } from 'src/modules/connected-account/services/blocklist/blocklist-validation.module'; +import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; @Module({ - imports: [BlocklistValidationModule], + imports: [ + BlocklistValidationModule, + TwentyORMModule.forFeature([MessageChannelWorkspaceEntity]), + ], providers: [ BlocklistCreateManyPreQueryHook, BlocklistUpdateManyPreQueryHook, BlocklistUpdateOnePreQueryHook, + ConnectedAccountDeleteOnePreQueryHook, ], }) export class ConnectedAccountQueryHookModule {} diff --git a/packages/twenty-server/src/modules/connected-account/query-hooks/connected-account/connected-account-delete-one.pre-query.hook.ts b/packages/twenty-server/src/modules/connected-account/query-hooks/connected-account/connected-account-delete-one.pre-query.hook.ts new file mode 100644 index 000000000000..9296263df119 --- /dev/null +++ b/packages/twenty-server/src/modules/connected-account/query-hooks/connected-account/connected-account-delete-one.pre-query.hook.ts @@ -0,0 +1,43 @@ +import { EventEmitter2 } from '@nestjs/event-emitter'; + +import { WorkspaceQueryHookInstance } from 'src/engine/api/graphql/workspace-query-runner/workspace-query-hook/interfaces/workspace-query-hook.interface'; +import { DeleteOneResolverArgs } from 'src/engine/api/graphql/workspace-resolver-builder/interfaces/workspace-resolvers-builder.interface'; + +import { WorkspaceQueryHook } from 'src/engine/api/graphql/workspace-query-runner/workspace-query-hook/decorators/workspace-query-hook.decorator'; +import { InjectWorkspaceRepository } from 'src/engine/twenty-orm/decorators/inject-workspace-repository.decorator'; +import { WorkspaceRepository } from 'src/engine/twenty-orm/repository/workspace.repository'; +import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { ObjectRecordDeleteEvent } from 'src/engine/integrations/event-emitter/types/object-record-delete.event'; + +@WorkspaceQueryHook(`connectedAccount.deleteOne`) +export class ConnectedAccountDeleteOnePreQueryHook + implements WorkspaceQueryHookInstance +{ + constructor( + @InjectWorkspaceRepository(MessageChannelWorkspaceEntity) + private readonly messageChannelRepository: WorkspaceRepository, + private eventEmitter: EventEmitter2, + ) {} + + async execute( + _userId: string, + workspaceId: string, + payload: DeleteOneResolverArgs, + ): Promise { + const connectedAccountId = payload.id; + + const messageChannels = await this.messageChannelRepository.findBy({ + connectedAccountId, + }); + + messageChannels.forEach((messageChannel) => { + this.eventEmitter.emit('messageChannel.deleted', { + workspaceId, + recordId: messageChannel.id, + } satisfies Pick< + ObjectRecordDeleteEvent, + 'workspaceId' | 'recordId' + >); + }); + } +} diff --git a/packages/twenty-server/src/modules/messaging/common/standard-objects/message-channel.workspace-entity.ts b/packages/twenty-server/src/modules/messaging/common/standard-objects/message-channel.workspace-entity.ts index b2bf40b1bc9f..9db9772d33b7 100644 --- a/packages/twenty-server/src/modules/messaging/common/standard-objects/message-channel.workspace-entity.ts +++ b/packages/twenty-server/src/modules/messaging/common/standard-objects/message-channel.workspace-entity.ts @@ -305,6 +305,7 @@ export class MessageChannelWorkspaceEntity extends BaseWorkspaceEntity { inverseSideFieldKey: 'messageChannels', }) connectedAccount: Relation; + connectedAccountId: string; @WorkspaceRelation({ standardId: diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-clean-cache.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-clean-cache.ts new file mode 100644 index 000000000000..8c2bc681f240 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-clean-cache.ts @@ -0,0 +1,38 @@ +import { Logger } from '@nestjs/common'; + +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; +import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator'; +import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; +import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service'; + +export type MessagingCleanCacheJobData = { + workspaceId: string; + messageChannelId: string; +}; + +@Processor(MessageQueue.messagingQueue) +export class MessagingCleanCacheJob { + private readonly logger = new Logger(MessagingCleanCacheJob.name); + + constructor( + @InjectCacheStorage(CacheStorageNamespace.Messaging) + private readonly cacheStorage: CacheStorageService, + ) {} + + @Process(MessagingCleanCacheJob.name) + async handle(data: MessagingCleanCacheJobData): Promise { + this.logger.log( + `Deleting message channel ${data.messageChannelId} associated cache in workspace ${data.workspaceId}`, + ); + + await this.cacheStorage.del( + `messages-to-import:${data.workspaceId}:gmail:${data.messageChannelId}`, + ); + + this.logger.log( + `Deleted message channel ${data.messageChannelId} associated cache in workspace ${data.workspaceId}`, + ); + } +} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/listeners/messaging-import-manager-message-channel.listener.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/listeners/messaging-import-manager-message-channel.listener.ts new file mode 100644 index 000000000000..d3f00c95fc42 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/listeners/messaging-import-manager-message-channel.listener.ts @@ -0,0 +1,33 @@ +import { Injectable } from '@nestjs/common'; +import { OnEvent } from '@nestjs/event-emitter'; + +import { ObjectRecordDeleteEvent } from 'src/engine/integrations/event-emitter/types/object-record-delete.event'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; +import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { + MessagingCleanCacheJob, + MessagingCleanCacheJobData, +} from 'src/modules/messaging/message-import-manager/jobs/messaging-clean-cache'; + +@Injectable() +export class MessagingMessageImportManagerMessageChannelListener { + constructor( + @InjectMessageQueue(MessageQueue.messagingQueue) + private readonly messageQueueService: MessageQueueService, + ) {} + + @OnEvent('messageChannel.deleted') + async handleDeletedEvent( + payload: ObjectRecordDeleteEvent, + ) { + await this.messageQueueService.add( + MessagingCleanCacheJob.name, + { + workspaceId: payload.workspaceId, + messageChannelId: payload.recordId, + }, + ); + } +} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts index 87cce001913c..11800472aa6b 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts @@ -15,9 +15,11 @@ import { MessagingMessagesImportCronJob } from 'src/modules/messaging/message-im import { MessagingOngoingStaleCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-ongoing-stale.cron.job'; import { MessagingGmailDriverModule } from 'src/modules/messaging/message-import-manager/drivers/gmail/messaging-gmail-driver.module'; import { MessagingAddSingleMessageToCacheForImportJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-add-single-message-to-cache-for-import.job'; +import { MessagingCleanCacheJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-clean-cache'; import { MessagingMessageListFetchJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job'; import { MessagingMessagesImportJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job'; import { MessagingOngoingStaleJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job'; +import { MessagingMessageImportManagerMessageChannelListener } from 'src/modules/messaging/message-import-manager/listeners/messaging-import-manager-message-channel.listener'; @Module({ imports: [ @@ -39,6 +41,8 @@ import { MessagingOngoingStaleJob } from 'src/modules/messaging/message-import-m MessagingMessagesImportCronJob, MessagingOngoingStaleCronJob, MessagingAddSingleMessageToCacheForImportJob, + MessagingMessageImportManagerMessageChannelListener, + MessagingCleanCacheJob, ], exports: [], })