diff --git a/packages/twenty-server/src/engine/core-modules/auth/auth.module.ts b/packages/twenty-server/src/engine/core-modules/auth/auth.module.ts index 064cb7aad123..62ca6a411053 100644 --- a/packages/twenty-server/src/engine/core-modules/auth/auth.module.ts +++ b/packages/twenty-server/src/engine/core-modules/auth/auth.module.ts @@ -30,7 +30,6 @@ import { WorkspaceManagerModule } from 'src/engine/workspace-manager/workspace-m import { CalendarChannelWorkspaceEntity } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity'; import { ConnectedAccountModule } from 'src/modules/connected-account/connected-account.module'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; -import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { AuthResolver } from './auth.resolver'; @@ -51,7 +50,6 @@ import { JwtAuthStrategy } from './strategies/jwt.auth.strategy'; ), ObjectMetadataRepositoryModule.forFeature([ ConnectedAccountWorkspaceEntity, - MessageChannelWorkspaceEntity, ]), HttpModule, UserWorkspaceModule, diff --git a/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts b/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts index 4f0efc5f3a5a..157c82ab7fd7 100644 --- a/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts +++ b/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts @@ -23,8 +23,8 @@ import { ConnectedAccountProvider, ConnectedAccountWorkspaceEntity, } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { + MessageChannelSyncStage, MessageChannelSyncStatus, MessageChannelType, MessageChannelVisibility, @@ -47,8 +47,6 @@ export class GoogleAPIsService { private readonly environmentService: EnvironmentService, @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) private readonly connectedAccountRepository: ConnectedAccountRepository, - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelRepository: MessageChannelRepository, private readonly accountsToReconnectService: AccountsToReconnectService, ) {} @@ -88,6 +86,11 @@ export class GoogleAPIsService { 'calendarChannel', ); + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + const workspaceDataSource = await this.twentyORMManager.getDatasource(); await workspaceDataSource.transaction(async (manager: EntityManager) => { @@ -105,7 +108,7 @@ export class GoogleAPIsService { manager, ); - await this.messageChannelRepository.create( + await messageChannelRepository.save( { id: v4(), connectedAccountId: newOrExistingConnectedAccountId, @@ -115,7 +118,7 @@ export class GoogleAPIsService { messageVisibility || MessageChannelVisibility.SHARE_EVERYTHING, syncStatus: MessageChannelSyncStatus.ONGOING, }, - workspaceId, + {}, manager, ); @@ -159,20 +162,27 @@ export class GoogleAPIsService { newOrExistingConnectedAccountId, ); - await this.messageChannelRepository.resetSync( - newOrExistingConnectedAccountId, - workspaceId, + await messageChannelRepository.update( + { + connectedAccountId: newOrExistingConnectedAccountId, + }, + { + syncStage: MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING, + syncStatus: null, + syncCursor: '', + syncStageStartedAt: null, + }, manager, ); } }); if (this.environmentService.get('MESSAGING_PROVIDER_GMAIL_ENABLED')) { - const messageChannels = - await this.messageChannelRepository.getByConnectedAccountId( - newOrExistingConnectedAccountId, - workspaceId, - ); + const messageChannels = await messageChannelRepository.find({ + where: { + connectedAccountId: newOrExistingConnectedAccountId, + }, + }); for (const messageChannel of messageChannels) { await this.messageQueueService.add( diff --git a/packages/twenty-server/src/engine/object-metadata-repository/metadata-to-repository.mapping.ts b/packages/twenty-server/src/engine/object-metadata-repository/metadata-to-repository.mapping.ts index 99a2413d10e4..b2053e44c814 100644 --- a/packages/twenty-server/src/engine/object-metadata-repository/metadata-to-repository.mapping.ts +++ b/packages/twenty-server/src/engine/object-metadata-repository/metadata-to-repository.mapping.ts @@ -1,10 +1,5 @@ import { BlocklistRepository } from 'src/modules/blocklist/repositories/blocklist.repository'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; -import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/common/repositories/message-channel-message-association.repository'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; -import { MessageParticipantRepository } from 'src/modules/messaging/common/repositories/message-participant.repository'; -import { MessageThreadRepository } from 'src/modules/messaging/common/repositories/message-thread.repository'; -import { MessageRepository } from 'src/modules/messaging/common/repositories/message.repository'; import { AuditLogRepository } from 'src/modules/timeline/repositiories/audit-log.repository'; import { TimelineActivityRepository } from 'src/modules/timeline/repositiories/timeline-activity.repository'; import { WorkspaceMemberRepository } from 'src/modules/workspace-member/repositories/workspace-member.repository'; @@ -13,12 +8,6 @@ export const metadataToRepositoryMapping = { AuditLogWorkspaceEntity: AuditLogRepository, BlocklistWorkspaceEntity: BlocklistRepository, ConnectedAccountWorkspaceEntity: ConnectedAccountRepository, - MessageChannelMessageAssociationWorkspaceEntity: - MessageChannelMessageAssociationRepository, - MessageChannelWorkspaceEntity: MessageChannelRepository, - MessageWorkspaceEntity: MessageRepository, - MessageParticipantWorkspaceEntity: MessageParticipantRepository, - MessageThreadWorkspaceEntity: MessageThreadRepository, TimelineActivityWorkspaceEntity: TimelineActivityRepository, WorkspaceMemberWorkspaceEntity: WorkspaceMemberRepository, }; diff --git a/packages/twenty-server/src/modules/contact-creation-manager/services/create-company.service.ts b/packages/twenty-server/src/modules/contact-creation-manager/services/create-company.service.ts index 6e9f3c8956c8..bc3c71c9aae1 100644 --- a/packages/twenty-server/src/modules/contact-creation-manager/services/create-company.service.ts +++ b/packages/twenty-server/src/modules/contact-creation-manager/services/create-company.service.ts @@ -157,7 +157,7 @@ export class CreateCompanyService { }; } - private async createCompanyMap(companies: CompanyWorkspaceEntity[]) { + private createCompanyMap(companies: CompanyWorkspaceEntity[]) { return companies.reduce( (acc, company) => { if (!company.domainName) { diff --git a/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job.ts b/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job.ts index a2941c679d1e..3c7d37204761 100644 --- a/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job.ts +++ b/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job.ts @@ -1,15 +1,15 @@ import { Logger, Scope } from '@nestjs/common'; +import { Any } from 'typeorm'; + import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { BlocklistRepository } from 'src/modules/blocklist/repositories/blocklist.repository'; import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity'; -import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/common/repositories/message-channel-message-association.repository'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; -import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingMessageCleanerService } from 'src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service'; export type BlocklistItemDeleteMessagesJobData = { @@ -25,15 +25,10 @@ export class BlocklistItemDeleteMessagesJob { private readonly logger = new Logger(BlocklistItemDeleteMessagesJob.name); constructor( - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelRepository: MessageChannelRepository, - @InjectObjectMetadataRepository( - MessageChannelMessageAssociationWorkspaceEntity, - ) - private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository, @InjectObjectMetadataRepository(BlocklistWorkspaceEntity) private readonly blocklistRepository: BlocklistRepository, private readonly threadCleanerService: MessagingMessageCleanerService, + private readonly twentyORMManager: TwentyORMManager, ) {} @Process(BlocklistItemDeleteMessagesJob.name) @@ -65,24 +60,26 @@ export class BlocklistItemDeleteMessagesJob { ); } - const messageChannels = - await this.messageChannelRepository.getIdsByWorkspaceMemberId( - workspaceMemberId, - workspaceId, + const messageChannelMessageAssociationRepository = + await this.twentyORMManager.getRepository( + 'messageChannelMessageAssociation', ); - const messageChannelIds = messageChannels.map(({ id }) => id); - const rolesToDelete: ('from' | 'to')[] = ['from', 'to']; - for (const messageChannelId of messageChannelIds) { - await this.messageChannelMessageAssociationRepository.deleteByMessageParticipantHandleAndMessageChannelIdAndRoles( - handle, - messageChannelId, - rolesToDelete, - workspaceId, - ); - } + await messageChannelMessageAssociationRepository.delete({ + messageChannel: { + connectedAccount: { + accountOwnerId: workspaceMemberId, + }, + }, + message: { + messageParticipants: { + handle, + role: Any(rolesToDelete), + }, + }, + }); await this.threadCleanerService.cleanWorkspaceThreads(workspaceId); diff --git a/packages/twenty-server/src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener.ts b/packages/twenty-server/src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener.ts index 60433a2902fc..996aca092466 100644 --- a/packages/twenty-server/src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener.ts +++ b/packages/twenty-server/src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener.ts @@ -8,6 +8,7 @@ import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decora import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; @@ -15,7 +16,6 @@ import { BlocklistItemDeleteMessagesJob, BlocklistItemDeleteMessagesJobData, } from 'src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; @@ -27,8 +27,7 @@ export class MessagingBlocklistListener { @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) private readonly connectedAccountRepository: ConnectedAccountRepository, private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService, - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelRepository: MessageChannelRepository, + private readonly twentyORMManager: TwentyORMManager, ) {} @OnEvent('blocklist.created') @@ -61,14 +60,19 @@ export class MessagingBlocklistListener { return; } - const messageChannel = - await this.messageChannelRepository.getByConnectedAccountId( - connectedAccount[0].id, - workspaceId, + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', ); + const messageChannel = await messageChannelRepository.findOneOrFail({ + where: { + connectedAccountId: connectedAccount[0].id, + }, + }); + await this.messagingChannelSyncStatusService.resetAndScheduleFullMessageListFetch( - messageChannel[0].id, + messageChannel.id, workspaceId, ); } @@ -98,14 +102,19 @@ export class MessagingBlocklistListener { return; } - const messageChannel = - await this.messageChannelRepository.getByConnectedAccountId( - connectedAccount[0].id, - workspaceId, + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', ); + const messageChannel = await messageChannelRepository.findOneOrFail({ + where: { + connectedAccountId: connectedAccount[0].id, + }, + }); + await this.messagingChannelSyncStatusService.resetAndScheduleFullMessageListFetch( - messageChannel[0].id, + messageChannel.id, workspaceId, ); } diff --git a/packages/twenty-server/src/modules/messaging/common/messaging-common.module.ts b/packages/twenty-server/src/modules/messaging/common/messaging-common.module.ts index d8836222bb99..8f570152438a 100644 --- a/packages/twenty-server/src/modules/messaging/common/messaging-common.module.ts +++ b/packages/twenty-server/src/modules/messaging/common/messaging-common.module.ts @@ -2,22 +2,13 @@ import { Module } from '@nestjs/common'; import { TypeOrmModule } from '@nestjs/typeorm'; import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity'; -import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module'; import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module'; import { ConnectedAccountModule } from 'src/modules/connected-account/connected-account.module'; import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; -import { MessageParticipantWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-participant.workspace-entity'; -import { MessageThreadWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-thread.workspace-entity'; -import { MessageWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message.workspace-entity'; @Module({ imports: [ WorkspaceDataSourceModule, - ObjectMetadataRepositoryModule.forFeature([ - MessageParticipantWorkspaceEntity, - MessageWorkspaceEntity, - MessageThreadWorkspaceEntity, - ]), TypeOrmModule.forFeature([FeatureFlagEntity], 'core'), ConnectedAccountModule, ], diff --git a/packages/twenty-server/src/modules/messaging/common/query-hooks/message/can-access-message-thread.service.ts b/packages/twenty-server/src/modules/messaging/common/query-hooks/message/can-access-message-thread.service.ts index f79c972c6100..44141f3dde92 100644 --- a/packages/twenty-server/src/modules/messaging/common/query-hooks/message/can-access-message-thread.service.ts +++ b/packages/twenty-server/src/modules/messaging/common/query-hooks/message/can-access-message-thread.service.ts @@ -1,11 +1,12 @@ import { ForbiddenException } from '@nestjs/common'; import groupBy from 'lodash.groupby'; +import { Any } from 'typeorm'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { WorkspaceMemberRepository } from 'src/modules/workspace-member/repositories/workspace-member.repository'; import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity'; @@ -13,12 +14,11 @@ import { isDefined } from 'src/utils/is-defined'; export class CanAccessMessageThreadService { constructor( - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelService: MessageChannelRepository, @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) private readonly connectedAccountRepository: ConnectedAccountRepository, @InjectObjectMetadataRepository(WorkspaceMemberWorkspaceEntity) private readonly workspaceMemberRepository: WorkspaceMemberRepository, + private readonly twentyORMManager: TwentyORMManager, ) {} public async canAccessMessageThread( @@ -26,12 +26,19 @@ export class CanAccessMessageThreadService { workspaceId: string, messageChannelMessageAssociations: any[], ) { - const messageChannels = await this.messageChannelService.getByIds( - messageChannelMessageAssociations.map( - (association) => association.messageChannelId, - ), - workspaceId, - ); + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + const messageChannels = await messageChannelRepository.find({ + where: { + id: Any( + messageChannelMessageAssociations.map( + (association) => association.messageChannelId, + ), + ), + }, + }); const messageChannelsGroupByVisibility = groupBy( messageChannels, diff --git a/packages/twenty-server/src/modules/messaging/common/query-hooks/messaging-query-hook.module.ts b/packages/twenty-server/src/modules/messaging/common/query-hooks/messaging-query-hook.module.ts index 4268de828017..9c84640768af 100644 --- a/packages/twenty-server/src/modules/messaging/common/query-hooks/messaging-query-hook.module.ts +++ b/packages/twenty-server/src/modules/messaging/common/query-hooks/messaging-query-hook.module.ts @@ -1,19 +1,15 @@ import { Module } from '@nestjs/common'; import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module'; -import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { CanAccessMessageThreadService } from 'src/modules/messaging/common/query-hooks/message/can-access-message-thread.service'; import { MessageFindManyPreQueryHook } from 'src/modules/messaging/common/query-hooks/message/message-find-many.pre-query.hook'; import { MessageFindOnePreQueryHook } from 'src/modules/messaging/common/query-hooks/message/message-find-one.pre-query-hook'; -import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; -import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity'; @Module({ imports: [ ObjectMetadataRepositoryModule.forFeature([ - MessageChannelMessageAssociationWorkspaceEntity, - MessageChannelWorkspaceEntity, ConnectedAccountWorkspaceEntity, WorkspaceMemberWorkspaceEntity, ]), diff --git a/packages/twenty-server/src/modules/messaging/common/repositories/message-channel-message-association.repository.ts b/packages/twenty-server/src/modules/messaging/common/repositories/message-channel-message-association.repository.ts deleted file mode 100644 index 0013e036f5e7..000000000000 --- a/packages/twenty-server/src/modules/messaging/common/repositories/message-channel-message-association.repository.ts +++ /dev/null @@ -1,87 +0,0 @@ -import { Injectable } from '@nestjs/common'; - -import { EntityManager } from 'typeorm'; - -import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; - -@Injectable() -export class MessageChannelMessageAssociationRepository { - constructor( - private readonly workspaceDataSourceService: WorkspaceDataSourceService, - ) {} - - public async deleteByMessageParticipantHandleAndMessageChannelIdAndRoles( - messageParticipantHandle: string, - messageChannelId: string, - rolesToDelete: ('from' | 'to' | 'cc' | 'bcc')[], - workspaceId: string, - transactionManager?: EntityManager, - ) { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - const isHandleDomain = messageParticipantHandle.startsWith('@'); - - const messageChannel = - await this.workspaceDataSourceService.executeRawQuery( - `SELECT * FROM ${dataSourceSchema}."messageChannel" - WHERE "id" = $1`, - [messageChannelId], - workspaceId, - transactionManager, - ); - - const messageChannelHandle = messageChannel[0].handle; - - const messageChannelMessageAssociationIdsToDelete = - await this.workspaceDataSourceService.executeRawQuery( - `SELECT "messageChannelMessageAssociation".id - FROM ${dataSourceSchema}."messageChannelMessageAssociation" "messageChannelMessageAssociation" - JOIN ${dataSourceSchema}."message" ON "messageChannelMessageAssociation"."messageId" = ${dataSourceSchema}."message"."id" - JOIN ${dataSourceSchema}."messageParticipant" "messageParticipant" ON ${dataSourceSchema}."message"."id" = "messageParticipant"."messageId" - WHERE "messageParticipant"."handle" != $1 - AND "messageParticipant"."handle" ${isHandleDomain ? '~*' : '='} $2 - AND "messageParticipant"."role" = ANY($3) - AND "messageChannelMessageAssociation"."messageChannelId" = $4`, - [ - messageChannelHandle, - isHandleDomain - ? // eslint-disable-next-line no-useless-escape - `.+@(.+\.)?${messageParticipantHandle.slice(1)}` - : messageParticipantHandle, - rolesToDelete, - messageChannelId, - ], - workspaceId, - transactionManager, - ); - - const messageChannelMessageAssociationIdsToDeleteArray = - messageChannelMessageAssociationIdsToDelete.map( - (messageChannelMessageAssociation: { id: string }) => - messageChannelMessageAssociation.id, - ); - - await this.deleteByIds( - messageChannelMessageAssociationIdsToDeleteArray, - workspaceId, - transactionManager, - ); - } - - public async deleteByIds( - ids: string[], - workspaceId: string, - transactionManager?: EntityManager, - ) { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `DELETE FROM ${dataSourceSchema}."messageChannelMessageAssociation" WHERE "id" = ANY($1)`, - [ids], - workspaceId, - transactionManager, - ); - } -} diff --git a/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts b/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts deleted file mode 100644 index 5b3cb4a45202..000000000000 --- a/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts +++ /dev/null @@ -1,320 +0,0 @@ -import { Injectable } from '@nestjs/common'; - -import { EntityManager } from 'typeorm'; - -import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; -import { - MessageChannelWorkspaceEntity, - MessageChannelSyncStatus, - MessageChannelSyncStage, -} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; - -@Injectable() -export class MessageChannelRepository { - constructor( - private readonly workspaceDataSourceService: WorkspaceDataSourceService, - ) {} - - public async create( - messageChannel: Pick< - MessageChannelWorkspaceEntity, - | 'id' - | 'connectedAccountId' - | 'type' - | 'handle' - | 'visibility' - | 'syncStatus' - >, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `INSERT INTO ${dataSourceSchema}."messageChannel" ("id", "connectedAccountId", "type", "handle", "visibility", "syncStatus") - VALUES ($1, $2, $3, $4, $5, $6)`, - [ - messageChannel.id, - messageChannel.connectedAccountId, - messageChannel.type, - messageChannel.handle, - messageChannel.visibility, - messageChannel.syncStatus, - ], - workspaceId, - transactionManager, - ); - } - - public async resetSync( - connectedAccountId: string, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageChannel" - SET "syncStatus" = NULL, - "syncStage" = '${MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING}', - "syncCursor" = '', - "syncStageStartedAt" = NULL - WHERE "connectedAccountId" = $1`, - [connectedAccountId], - workspaceId, - transactionManager, - ); - } - - public async getAll( - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - return await this.workspaceDataSourceService.executeRawQuery( - `SELECT * FROM ${dataSourceSchema}."messageChannel"`, - [], - workspaceId, - transactionManager, - ); - } - - public async getByConnectedAccountId( - connectedAccountId: string, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - return await this.workspaceDataSourceService.executeRawQuery( - `SELECT * FROM ${dataSourceSchema}."messageChannel" WHERE "connectedAccountId" = $1 AND "type" = 'email' LIMIT 1`, - [connectedAccountId], - workspaceId, - transactionManager, - ); - } - - public async getFirstByConnectedAccountIdOrFail( - connectedAccountId: string, - workspaceId: string, - ): Promise { - const messageChannel = await this.getFirstByConnectedAccountId( - connectedAccountId, - workspaceId, - ); - - if (!messageChannel) { - throw new Error( - `Message channel for connected account ${connectedAccountId} not found in workspace ${workspaceId}`, - ); - } - - return messageChannel; - } - - public async getFirstByConnectedAccountId( - connectedAccountId: string, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const messageChannels = await this.getByConnectedAccountId( - connectedAccountId, - workspaceId, - transactionManager, - ); - - return messageChannels[0]; - } - - public async getByIds( - ids: string[], - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - return await this.workspaceDataSourceService.executeRawQuery( - `SELECT * FROM ${dataSourceSchema}."messageChannel" WHERE "id" = ANY($1)`, - [ids], - workspaceId, - transactionManager, - ); - } - - public async getById( - id: string, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - const messageChannels = - await this.workspaceDataSourceService.executeRawQuery( - `SELECT * FROM ${dataSourceSchema}."messageChannel" WHERE "id" = $1`, - [id], - workspaceId, - transactionManager, - ); - - return messageChannels[0]; - } - - public async getIdsByWorkspaceMemberId( - workspaceMemberId: string, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - const messageChannelIds = - await this.workspaceDataSourceService.executeRawQuery( - `SELECT "messageChannel".id FROM ${dataSourceSchema}."messageChannel" "messageChannel" - JOIN ${dataSourceSchema}."connectedAccount" ON "messageChannel"."connectedAccountId" = ${dataSourceSchema}."connectedAccount"."id" - WHERE ${dataSourceSchema}."connectedAccount"."accountOwnerId" = $1`, - [workspaceMemberId], - workspaceId, - transactionManager, - ); - - return messageChannelIds; - } - - public async updateSyncStatus( - id: string, - syncStatus: MessageChannelSyncStatus, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - const needsToUpdateSyncedAt = - syncStatus === MessageChannelSyncStatus.ACTIVE; - - await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageChannel" SET "syncStatus" = $1 ${ - needsToUpdateSyncedAt ? `, "syncedAt" = NOW()` : '' - } WHERE "id" = $2`, - [syncStatus, id], - workspaceId, - transactionManager, - ); - } - - public async updateSyncStage( - id: string, - syncStage: MessageChannelSyncStage, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - const needsToUpdateSyncStageStartedAt = - syncStage === MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING || - syncStage === MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING; - - await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageChannel" SET "syncStage" = $1 ${ - needsToUpdateSyncStageStartedAt ? `, "syncStageStartedAt" = NOW()` : '' - } WHERE "id" = $2`, - [syncStage, id], - workspaceId, - transactionManager, - ); - } - - public async resetSyncStageStartedAt( - id: string, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageChannel" SET "syncStageStartedAt" = NULL WHERE "id" = $1`, - [id], - workspaceId, - transactionManager, - ); - } - - public async updateLastSyncCursorIfHigher( - id: string, - syncCursor: string, - workspaceId: string, - transactionManager?: EntityManager, - ) { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageChannel" SET "syncCursor" = $1 - WHERE "id" = $2 - AND ("syncCursor" < $1 OR "syncCursor" = '')`, - [syncCursor, id], - workspaceId, - transactionManager, - ); - } - - public async resetSyncCursor( - id: string, - workspaceId: string, - transactionManager?: EntityManager, - ) { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageChannel" SET "syncCursor" = '' - WHERE "id" = $1`, - [id], - workspaceId, - transactionManager, - ); - } - - public async incrementThrottleFailureCount( - id: string, - workspaceId: string, - transactionManager?: EntityManager, - ) { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageChannel" SET "throttleFailureCount" = "throttleFailureCount" + 1 - WHERE "id" = $1`, - [id], - workspaceId, - transactionManager, - ); - } - - public async resetThrottleFailureCount( - id: string, - workspaceId: string, - transactionManager?: EntityManager, - ) { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageChannel" SET "throttleFailureCount" = 0 - WHERE "id" = $1`, - [id], - workspaceId, - transactionManager, - ); - } -} diff --git a/packages/twenty-server/src/modules/messaging/common/repositories/message-participant.repository.ts b/packages/twenty-server/src/modules/messaging/common/repositories/message-participant.repository.ts deleted file mode 100644 index 2cb2214f1168..000000000000 --- a/packages/twenty-server/src/modules/messaging/common/repositories/message-participant.repository.ts +++ /dev/null @@ -1,209 +0,0 @@ -import { Injectable } from '@nestjs/common'; - -import { EntityManager } from 'typeorm'; - -import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; -import { MessageParticipantWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-participant.workspace-entity'; -import { ParticipantWithId } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message'; - -@Injectable() -export class MessageParticipantRepository { - constructor( - private readonly workspaceDataSourceService: WorkspaceDataSourceService, - ) {} - - public async getByHandles( - handles: string[], - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - return await this.workspaceDataSourceService.executeRawQuery( - `SELECT * FROM ${dataSourceSchema}."messageParticipant" WHERE "handle" = ANY($1)`, - [handles], - workspaceId, - transactionManager, - ); - } - - public async updateParticipantsPersonId( - participantIds: string[], - personId: string, - workspaceId: string, - transactionManager?: EntityManager, - ) { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageParticipant" SET "personId" = $1 WHERE "id" = ANY($2)`, - [personId, participantIds], - workspaceId, - transactionManager, - ); - } - - public async updateParticipantsPersonIdAndReturn( - participantIds: string[], - personId: string, - workspaceId: string, - transactionManager?: EntityManager, - ) { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - return await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageParticipant" SET "personId" = $1 WHERE "id" = ANY($2) RETURNING *`, - [personId, participantIds], - workspaceId, - transactionManager, - ); - } - - public async updateParticipantsWorkspaceMemberId( - participantIds: string[], - workspaceMemberId: string, - workspaceId: string, - transactionManager?: EntityManager, - ) { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageParticipant" SET "workspaceMemberId" = $1 WHERE "id" = ANY($2)`, - [workspaceMemberId, participantIds], - workspaceId, - transactionManager, - ); - } - - public async removePersonIdByHandle( - handle: string, - workspaceId: string, - transactionManager?: EntityManager, - ) { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageParticipant" SET "personId" = NULL WHERE "handle" = $1`, - [handle], - workspaceId, - transactionManager, - ); - } - - public async removeWorkspaceMemberIdByHandle( - handle: string, - workspaceId: string, - transactionManager?: EntityManager, - ) { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageParticipant" SET "workspaceMemberId" = NULL WHERE "handle" = $1`, - [handle], - workspaceId, - transactionManager, - ); - } - - public async getByMessageChannelIdWithoutPersonIdAndWorkspaceMemberIdAndMessageOutgoing( - messageChannelId: string, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - if (!messageChannelId || !workspaceId) { - return []; - } - - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - const messageParticipants: ParticipantWithId[] = - await this.workspaceDataSourceService.executeRawQuery( - `SELECT "messageParticipant".id, - "messageParticipant"."role", - "messageParticipant"."handle", - "messageParticipant"."displayName", - "messageParticipant"."personId", - "messageParticipant"."workspaceMemberId", - "messageParticipant"."messageId" - FROM ${dataSourceSchema}."messageParticipant" "messageParticipant" - LEFT JOIN ${dataSourceSchema}."message" ON "messageParticipant"."messageId" = ${dataSourceSchema}."message"."id" - LEFT JOIN ${dataSourceSchema}."messageChannelMessageAssociation" ON ${dataSourceSchema}."messageChannelMessageAssociation"."messageId" = ${dataSourceSchema}."message"."id" - WHERE ${dataSourceSchema}."messageChannelMessageAssociation"."messageChannelId" = $1 - AND "messageParticipant"."personId" IS NULL - AND "messageParticipant"."workspaceMemberId" IS NULL - AND ${dataSourceSchema}."message"."direction" = 'outgoing'`, - [messageChannelId], - workspaceId, - transactionManager, - ); - - return messageParticipants; - } - - public async getByMessageChannelIdWithoutPersonIdAndWorkspaceMemberId( - messageChannelId: string, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - if (!messageChannelId || !workspaceId) { - return []; - } - - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - const messageParticipants: ParticipantWithId[] = - await this.workspaceDataSourceService.executeRawQuery( - `SELECT "messageParticipant".id, - "messageParticipant"."role", - "messageParticipant"."handle", - "messageParticipant"."displayName", - "messageParticipant"."personId", - "messageParticipant"."workspaceMemberId", - "messageParticipant"."messageId" - FROM ${dataSourceSchema}."messageParticipant" "messageParticipant" - LEFT JOIN ${dataSourceSchema}."message" ON "messageParticipant"."messageId" = ${dataSourceSchema}."message"."id" - LEFT JOIN ${dataSourceSchema}."messageChannelMessageAssociation" ON ${dataSourceSchema}."messageChannelMessageAssociation"."messageId" = ${dataSourceSchema}."message"."id" - WHERE ${dataSourceSchema}."messageChannelMessageAssociation"."messageChannelId" = $1 - AND "messageParticipant"."personId" IS NULL - AND "messageParticipant"."workspaceMemberId" IS NULL`, - [messageChannelId], - workspaceId, - transactionManager, - ); - - return messageParticipants; - } - - public async getWithoutPersonIdAndWorkspaceMemberId( - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - if (!workspaceId) { - throw new Error('WorkspaceId is required'); - } - - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - const messageParticipants: ParticipantWithId[] = - await this.workspaceDataSourceService.executeRawQuery( - `SELECT "messageParticipant".* - FROM ${dataSourceSchema}."messageParticipant" "messageParticipant" - WHERE "messageParticipant"."personId" IS NULL - AND "messageParticipant"."workspaceMemberId" IS NULL`, - [], - workspaceId, - transactionManager, - ); - - return messageParticipants; - } -} diff --git a/packages/twenty-server/src/modules/messaging/common/repositories/message-thread.repository.ts b/packages/twenty-server/src/modules/messaging/common/repositories/message-thread.repository.ts deleted file mode 100644 index bf0e9d7c90af..000000000000 --- a/packages/twenty-server/src/modules/messaging/common/repositories/message-thread.repository.ts +++ /dev/null @@ -1,67 +0,0 @@ -import { Injectable } from '@nestjs/common'; - -import { EntityManager } from 'typeorm'; - -import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; - -@Injectable() -export class MessageThreadRepository { - constructor( - private readonly workspaceDataSourceService: WorkspaceDataSourceService, - ) {} - - public async getOrphanThreadIdsPaginated( - limit: number, - offset: number, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - const orphanThreads = await this.workspaceDataSourceService.executeRawQuery( - `SELECT mt.id - FROM ${dataSourceSchema}."messageThread" mt - LEFT JOIN ${dataSourceSchema}."message" m ON mt.id = m."messageThreadId" - WHERE m."messageThreadId" IS NULL - LIMIT $1 OFFSET $2`, - [limit, offset], - workspaceId, - transactionManager, - ); - - return orphanThreads.map(({ id }) => id); - } - - public async deleteByIds( - messageThreadIds: string[], - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `DELETE FROM ${dataSourceSchema}."messageThread" WHERE id = ANY($1)`, - [messageThreadIds], - workspaceId, - transactionManager, - ); - } - - public async insert( - messageThreadId: string, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `INSERT INTO ${dataSourceSchema}."messageThread" (id) VALUES ($1)`, - [messageThreadId], - workspaceId, - transactionManager, - ); - } -} diff --git a/packages/twenty-server/src/modules/messaging/common/repositories/message.repository.ts b/packages/twenty-server/src/modules/messaging/common/repositories/message.repository.ts deleted file mode 100644 index 5f1d3eb25a4c..000000000000 --- a/packages/twenty-server/src/modules/messaging/common/repositories/message.repository.ts +++ /dev/null @@ -1,137 +0,0 @@ -import { Injectable } from '@nestjs/common'; - -import { EntityManager } from 'typeorm'; - -import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; -import { MessageWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message.workspace-entity'; - -@Injectable() -export class MessageRepository { - constructor( - private readonly workspaceDataSourceService: WorkspaceDataSourceService, - ) {} - - public async getNonAssociatedMessageIdsPaginated( - limit: number, - offset: number, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - const nonAssociatedMessages = - await this.workspaceDataSourceService.executeRawQuery( - `SELECT m.id FROM ${dataSourceSchema}."message" m - LEFT JOIN ${dataSourceSchema}."messageChannelMessageAssociation" mcma - ON m.id = mcma."messageId" - WHERE mcma.id IS NULL - LIMIT $1 OFFSET $2`, - [limit, offset], - workspaceId, - transactionManager, - ); - - return nonAssociatedMessages.map(({ id }) => id); - } - - public async getFirstOrNullByHeaderMessageId( - headerMessageId: string, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - const messages = await this.workspaceDataSourceService.executeRawQuery( - `SELECT * FROM ${dataSourceSchema}."message" WHERE "headerMessageId" = $1 LIMIT 1`, - [headerMessageId], - workspaceId, - transactionManager, - ); - - if (!messages || messages.length === 0) { - return null; - } - - return messages[0]; - } - - public async getByIds( - messageIds: string[], - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - return await this.workspaceDataSourceService.executeRawQuery( - `SELECT * FROM ${dataSourceSchema}."message" WHERE "id" = ANY($1)`, - [messageIds], - workspaceId, - transactionManager, - ); - } - - public async deleteByIds( - messageIds: string[], - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `DELETE FROM ${dataSourceSchema}."message" WHERE "id" = ANY($1)`, - [messageIds], - workspaceId, - transactionManager, - ); - } - - public async getByMessageThreadIds( - messageThreadIds: string[], - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - return await this.workspaceDataSourceService.executeRawQuery( - `SELECT * FROM ${dataSourceSchema}."message" WHERE "messageThreadId" = ANY($1)`, - [messageThreadIds], - workspaceId, - transactionManager, - ); - } - - public async insert( - id: string, - headerMessageId: string, - subject: string, - receivedAt: Date, - messageDirection: string, - messageThreadId: string, - text: string, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `INSERT INTO ${dataSourceSchema}."message" ("id", "headerMessageId", "subject", "receivedAt", "direction", "messageThreadId", "text") VALUES ($1, $2, $3, $4, $5, $6, $7)`, - [ - id, - headerMessageId, - subject, - receivedAt, - messageDirection, - messageThreadId, - text, - ], - workspaceId, - transactionManager, - ); - } -} diff --git a/packages/twenty-server/src/modules/messaging/common/services/messaging-channel-sync-status.service.ts b/packages/twenty-server/src/modules/messaging/common/services/messaging-channel-sync-status.service.ts index 2b326bc39296..49c52cb8333a 100644 --- a/packages/twenty-server/src/modules/messaging/common/services/messaging-channel-sync-status.service.ts +++ b/packages/twenty-server/src/modules/messaging/common/services/messaging-channel-sync-status.service.ts @@ -3,11 +3,9 @@ import { Injectable } from '@nestjs/common'; import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service'; import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator'; import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; -import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { AccountsToReconnectService } from 'src/modules/connected-account/services/accounts-to-reconnect.service'; import { AccountsToReconnectKeys } from 'src/modules/connected-account/types/accounts-to-reconnect-key-value.type'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { MessageChannelSyncStage, MessageChannelSyncStatus, @@ -17,44 +15,57 @@ import { @Injectable() export class MessagingChannelSyncStatusService { constructor( - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelRepository: MessageChannelRepository, @InjectCacheStorage(CacheStorageNamespace.Messaging) private readonly cacheStorage: CacheStorageService, private readonly twentyORMManager: TwentyORMManager, private readonly accountsToReconnectService: AccountsToReconnectService, ) {} - public async scheduleFullMessageListFetch( - messageChannelId: string, - workspaceId: string, - ) { - await this.messageChannelRepository.updateSyncStage( - messageChannelId, - MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING, - workspaceId, + public async scheduleFullMessageListFetch(messageChannelId: string) { + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + + await messageChannelRepository.update( + { + id: messageChannelId, + }, + { + syncStage: MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING, + }, ); } - public async schedulePartialMessageListFetch( - messageChannelId: string, - workspaceId: string, - ) { - await this.messageChannelRepository.updateSyncStage( - messageChannelId, - MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING, - workspaceId, + public async schedulePartialMessageListFetch(messageChannelId: string) { + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + + await messageChannelRepository.update( + { + id: messageChannelId, + }, + { + syncStage: MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING, + }, ); } - public async scheduleMessagesImport( - messageChannelId: string, - workspaceId: string, - ) { - await this.messageChannelRepository.updateSyncStage( - messageChannelId, - MessageChannelSyncStage.MESSAGES_IMPORT_PENDING, - workspaceId, + public async scheduleMessagesImport(messageChannelId: string) { + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + + await messageChannelRepository.update( + { + id: messageChannelId, + }, + { + syncStage: MessageChannelSyncStage.MESSAGES_IMPORT_PENDING, + }, ); } @@ -66,62 +77,75 @@ export class MessagingChannelSyncStatusService { `messages-to-import:${workspaceId}:gmail:${messageChannelId}`, ); - await this.messageChannelRepository.resetSyncCursor( - messageChannelId, - workspaceId, - ); - - await this.messageChannelRepository.resetSyncStageStartedAt( - messageChannelId, - workspaceId, - ); + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); - await this.messageChannelRepository.resetThrottleFailureCount( - messageChannelId, - workspaceId, + await messageChannelRepository.update( + { + id: messageChannelId, + }, + { + syncCursor: '', + syncStageStartedAt: null, + throttleFailureCount: 0, + }, ); - await this.scheduleFullMessageListFetch(messageChannelId, workspaceId); + await this.scheduleFullMessageListFetch(messageChannelId); } - public async markAsMessagesListFetchOngoing( - messageChannelId: string, - workspaceId: string, - ) { - await this.messageChannelRepository.updateSyncStage( - messageChannelId, - MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING, - workspaceId, - ); + public async markAsMessagesListFetchOngoing(messageChannelId: string) { + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); - await this.messageChannelRepository.updateSyncStatus( - messageChannelId, - MessageChannelSyncStatus.ONGOING, - workspaceId, + await messageChannelRepository.update( + { + id: messageChannelId, + }, + { + syncStage: MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING, + syncStatus: MessageChannelSyncStatus.ONGOING, + }, ); } public async markAsCompletedAndSchedulePartialMessageListFetch( messageChannelId: string, - workspaceId: string, ) { - await this.messageChannelRepository.updateSyncStatus( - messageChannelId, - MessageChannelSyncStatus.ACTIVE, - workspaceId, + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + + await messageChannelRepository.update( + { + id: messageChannelId, + }, + { + syncStatus: MessageChannelSyncStatus.ACTIVE, + }, ); - await this.schedulePartialMessageListFetch(messageChannelId, workspaceId); + await this.schedulePartialMessageListFetch(messageChannelId); } - public async markAsMessagesImportOngoing( - messageChannelId: string, - workspaceId: string, - ) { - await this.messageChannelRepository.updateSyncStage( - messageChannelId, - MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING, - workspaceId, + public async markAsMessagesImportOngoing(messageChannelId: string) { + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + + await messageChannelRepository.update( + { + id: messageChannelId, + }, + { + syncStage: MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING, + }, ); } @@ -133,16 +157,19 @@ export class MessagingChannelSyncStatusService { `messages-to-import:${workspaceId}:gmail:${messageChannelId}`, ); - await this.messageChannelRepository.updateSyncStage( - messageChannelId, - MessageChannelSyncStage.FAILED, - workspaceId, - ); + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); - await this.messageChannelRepository.updateSyncStatus( - messageChannelId, - MessageChannelSyncStatus.FAILED_UNKNOWN, - workspaceId, + await messageChannelRepository.update( + { + id: messageChannelId, + }, + { + syncStage: MessageChannelSyncStage.FAILED, + syncStatus: MessageChannelSyncStatus.FAILED_UNKNOWN, + }, ); } @@ -154,16 +181,19 @@ export class MessagingChannelSyncStatusService { `messages-to-import:${workspaceId}:gmail:${messageChannelId}`, ); - await this.messageChannelRepository.updateSyncStage( - messageChannelId, - MessageChannelSyncStage.FAILED, - workspaceId, - ); + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); - await this.messageChannelRepository.updateSyncStatus( - messageChannelId, - MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS, - workspaceId, + await messageChannelRepository.update( + { + id: messageChannelId, + }, + { + syncStage: MessageChannelSyncStage.FAILED, + syncStatus: MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS, + }, ); await this.addToAccountsToReconnect(messageChannelId, workspaceId); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts index 7e6f42d978c3..0eb06acc5ec1 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts @@ -1,4 +1,3 @@ -import { Logger } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; @@ -12,8 +11,7 @@ import { Process } from 'src/engine/integrations/message-queue/decorators/proces import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; -import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; +import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; import { MessageChannelSyncStage, MessageChannelWorkspaceEntity, @@ -25,15 +23,12 @@ import { @Processor(MessageQueue.cronQueue) export class MessagingMessageListFetchCronJob { - private readonly logger = new Logger(MessagingMessageListFetchCronJob.name); - constructor( @InjectRepository(Workspace, 'core') private readonly workspaceRepository: Repository, @InjectMessageQueue(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelRepository: MessageChannelRepository, + private readonly twentyORMGlobalManager: TwentyORMGlobalManager, ) {} @Process(MessagingMessageListFetchCronJob.name) @@ -45,9 +40,15 @@ export class MessagingMessageListFetchCronJob { }); for (const activeWorkspace of activeWorkspaces) { - const messageChannels = await this.messageChannelRepository.getAll( - activeWorkspace.id, - ); + const messageChannelRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + activeWorkspace.id, + 'messageChannel', + ); + + const messageChannels = await messageChannelRepository.find({ + select: ['id'], + }); for (const messageChannel of messageChannels) { if ( diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts index e47e49d852d5..e3da8b05bbc9 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts @@ -12,8 +12,7 @@ import { Process } from 'src/engine/integrations/message-queue/decorators/proces import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; -import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; +import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; import { MessageChannelSyncStage, MessageChannelWorkspaceEntity, @@ -32,8 +31,7 @@ export class MessagingMessagesImportCronJob { private readonly workspaceRepository: Repository, @InjectMessageQueue(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelRepository: MessageChannelRepository, + private readonly twentyORMGlobalManager: TwentyORMGlobalManager, ) {} @Process(MessagingMessagesImportCronJob.name) @@ -45,9 +43,15 @@ export class MessagingMessagesImportCronJob { }); for (const activeWorkspace of activeWorkspaces) { - const messageChannels = await this.messageChannelRepository.getAll( - activeWorkspace.id, - ); + const messageChannelRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + activeWorkspace.id, + 'messageChannel', + ); + + const messageChannels = await messageChannelRepository.find({ + select: ['id', 'isSyncEnabled', 'syncStage'], + }); for (const messageChannel of messageChannels) { if ( diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/messaging-gmail-driver.module.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/messaging-gmail-driver.module.ts index 008bc157614f..ac5bf9ea62ba 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/messaging-gmail-driver.module.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/messaging-gmail-driver.module.ts @@ -12,8 +12,6 @@ import { EmailAliasManagerModule } from 'src/modules/connected-account/email-ali import { OAuth2ClientManagerModule } from 'src/modules/connected-account/oauth2-client-manager/oauth2-client-manager.module'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { MessagingCommonModule } from 'src/modules/messaging/common/messaging-common.module'; -import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; -import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingGmailClientProvider } from 'src/modules/messaging/message-import-manager/drivers/gmail/providers/messaging-gmail-client.provider'; import { MessagingGmailFetchByBatchesService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-by-batch.service'; import { MessagingGmailFetchMessagesByBatchesService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-by-batches.service'; @@ -29,8 +27,6 @@ import { MessageParticipantManagerModule } from 'src/modules/messaging/message-p EnvironmentModule, ObjectMetadataRepositoryModule.forFeature([ ConnectedAccountWorkspaceEntity, - MessageChannelWorkspaceEntity, - MessageChannelMessageAssociationWorkspaceEntity, BlocklistWorkspaceEntity, ]), MessagingCommonModule, diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts index 98dd34d4994f..09854fbf4f3a 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts @@ -4,10 +4,10 @@ import { Process } from 'src/engine/integrations/message-queue/decorators/proces import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { isThrottled } from 'src/modules/connected-account/utils/is-throttled'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { MessageChannelSyncStage, MessageChannelWorkspaceEntity, @@ -33,9 +33,8 @@ export class MessagingMessageListFetchJob { private readonly messagingPartialMessageListFetchService: MessagingPartialMessageListFetchService, @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) private readonly connectedAccountRepository: ConnectedAccountRepository, - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelRepository: MessageChannelRepository, private readonly messagingTelemetryService: MessagingTelemetryService, + private readonly twentyORMManager: TwentyORMManager, ) {} @Process(MessagingMessageListFetchJob.name) @@ -48,10 +47,16 @@ export class MessagingMessageListFetchJob { workspaceId, }); - const messageChannel = await this.messageChannelRepository.getById( - messageChannelId, - workspaceId, - ); + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + + const messageChannel = await messageChannelRepository.findOne({ + where: { + id: messageChannelId, + }, + }); if (!messageChannel) { await this.messagingTelemetryService.track({ diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts index becfa6d548c8..935be741e363 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts @@ -4,10 +4,10 @@ import { Process } from 'src/engine/integrations/message-queue/decorators/proces import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { isThrottled } from 'src/modules/connected-account/utils/is-throttled'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { MessageChannelSyncStage, MessageChannelWorkspaceEntity, @@ -29,9 +29,8 @@ export class MessagingMessagesImportJob { @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) private readonly connectedAccountRepository: ConnectedAccountRepository, private readonly gmailFetchMessageContentFromCacheService: MessagingMessagesImportService, - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelRepository: MessageChannelRepository, private readonly messagingTelemetryService: MessagingTelemetryService, + private readonly twentyORMManager: TwentyORMManager, ) {} @Process(MessagingMessagesImportJob.name) @@ -44,10 +43,16 @@ export class MessagingMessagesImportJob { messageChannelId, }); - const messageChannel = await this.messageChannelRepository.getById( - messageChannelId, - workspaceId, - ); + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + + const messageChannel = await messageChannelRepository.findOne({ + where: { + id: messageChannelId, + }, + }); if (!messageChannel) { await this.messagingTelemetryService.track({ diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts index 45e2b4ec264c..a2839cdd92c0 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts @@ -59,13 +59,11 @@ export class MessagingOngoingStaleJob { case MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING: await this.messagingChannelSyncStatusService.schedulePartialMessageListFetch( messageChannel.id, - workspaceId, ); break; case MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING: await this.messagingChannelSyncStatusService.scheduleMessagesImport( messageChannel.id, - workspaceId, ); break; default: diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-error-handling.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-error-handling.service.ts index 52585e3736a1..2a6570961ce6 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-error-handling.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-error-handling.service.ts @@ -3,9 +3,9 @@ import { Injectable } from '@nestjs/common'; import snakeCase from 'lodash.snakecase'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MESSAGING_THROTTLE_MAX_ATTEMPTS } from 'src/modules/messaging/message-import-manager/constants/messaging-throttle-max-attempts'; @@ -28,8 +28,7 @@ export class MessagingErrorHandlingService { private readonly connectedAccountRepository: ConnectedAccountRepository, private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService, private readonly messagingTelemetryService: MessagingTelemetryService, - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelRepository: MessageChannelRepository, + private readonly twentyORMManager: TwentyORMManager, ) {} public async handleGmailError( @@ -263,21 +262,18 @@ export class MessagingErrorHandlingService { case 'full-message-list-fetch': await this.messagingChannelSyncStatusService.scheduleFullMessageListFetch( messageChannel.id, - workspaceId, ); break; case 'partial-message-list-fetch': await this.messagingChannelSyncStatusService.schedulePartialMessageListFetch( messageChannel.id, - workspaceId, ); break; case 'messages-import': await this.messagingChannelSyncStatusService.scheduleMessagesImport( messageChannel.id, - workspaceId, ); break; @@ -290,9 +286,17 @@ export class MessagingErrorHandlingService { messageChannel: MessageChannelWorkspaceEntity, workspaceId: string, ): Promise { - await this.messageChannelRepository.incrementThrottleFailureCount( - messageChannel.id, - workspaceId, + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + + await messageChannelRepository.increment( + { + id: messageChannel.id, + }, + 'throttleFailureCount', + 1, ); await this.messagingTelemetryService.track({ diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts index bda33157baaa..dde37698e9c8 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts @@ -7,10 +7,8 @@ import { Any, EntityManager } from 'typeorm'; import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service'; import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator'; import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; -import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; @@ -31,8 +29,6 @@ export class MessagingFullMessageListFetchService { constructor( private readonly gmailClientProvider: MessagingGmailClientProvider, - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelRepository: MessageChannelRepository, @InjectCacheStorage(CacheStorageNamespace.Messaging) private readonly cacheStorage: CacheStorageService, private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService, @@ -47,7 +43,6 @@ export class MessagingFullMessageListFetchService { ) { await this.messagingChannelSyncStatusService.markAsMessagesListFetchOngoing( messageChannel.id, - workspaceId, ); const gmailClient: gmail_v1.Gmail = @@ -70,19 +65,23 @@ export class MessagingFullMessageListFetchService { return; } - await this.messageChannelRepository.resetThrottleFailureCount( - messageChannel.id, - workspaceId, - ); + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); - await this.messageChannelRepository.resetSyncStageStartedAt( - messageChannel.id, - workspaceId, + await messageChannelRepository.update( + { + id: messageChannel.id, + }, + { + throttleFailureCount: 0, + syncStageStartedAt: null, + }, ); await this.messagingChannelSyncStatusService.scheduleMessagesImport( messageChannel.id, - workspaceId, ); } @@ -216,11 +215,32 @@ export class MessagingFullMessageListFetchService { ); } - await this.messageChannelRepository.updateLastSyncCursorIfHigher( - messageChannelId, - historyId, - workspaceId, + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + + const messageChannel = await messageChannelRepository.findOneOrFail( + { + where: { + id: messageChannelId, + }, + }, transactionManager, ); + + const currentSyncCursor = messageChannel.syncCursor; + + if (!currentSyncCursor || historyId > currentSyncCursor) { + await messageChannelRepository.update( + { + id: messageChannel.id, + }, + { + syncCursor: historyId, + }, + transactionManager, + ); + } } } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts index d39b507545ef..89fcedb6d874 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts @@ -6,13 +6,13 @@ import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator'; import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { BlocklistRepository } from 'src/modules/blocklist/repositories/blocklist.repository'; import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity'; import { EmailAliasManagerService } from 'src/modules/connected-account/email-alias-manager/services/email-alias-manager.service'; import { RefreshAccessTokenService } from 'src/modules/connected-account/refresh-access-token-manager/services/refresh-access-token.service'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; import { MessageChannelSyncStage, @@ -40,12 +40,11 @@ export class MessagingMessagesImportService { private readonly messagingTelemetryService: MessagingTelemetryService, @InjectObjectMetadataRepository(BlocklistWorkspaceEntity) private readonly blocklistRepository: BlocklistRepository, - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelRepository: MessageChannelRepository, private readonly emailAliasManagerService: EmailAliasManagerService, private readonly isFeatureEnabledService: IsFeatureEnabledService, @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) private readonly connectedAccountRepository: ConnectedAccountRepository, + private readonly twentyORMManager: TwentyORMManager, ) {} async processMessageBatchImport( @@ -73,7 +72,6 @@ export class MessagingMessagesImportService { await this.messagingChannelSyncStatusService.markAsMessagesImportOngoing( messageChannel.id, - workspaceId, ); let accessToken: string; @@ -139,7 +137,6 @@ export class MessagingMessagesImportService { if (!messageIdsToFetch?.length) { await this.messagingChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch( messageChannel.id, - workspaceId, ); return await this.trackMessageImportCompleted( @@ -180,23 +177,26 @@ export class MessagingMessagesImportService { ) { await this.messagingChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch( messageChannel.id, - workspaceId, ); } else { await this.messagingChannelSyncStatusService.scheduleMessagesImport( messageChannel.id, - workspaceId, ); } - await this.messageChannelRepository.resetThrottleFailureCount( - messageChannel.id, - workspaceId, - ); + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); - await this.messageChannelRepository.resetSyncStageStartedAt( - messageChannel.id, - workspaceId, + await messageChannelRepository.update( + { + id: messageChannel.id, + }, + { + throttleFailureCount: 0, + syncStageStartedAt: null, + }, ); return await this.trackMessageImportCompleted( diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts index a186f7615ed2..7ab5f5c65250 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts @@ -6,10 +6,8 @@ import { Any } from 'typeorm'; import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service'; import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator'; import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; -import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; @@ -26,8 +24,6 @@ export class MessagingPartialMessageListFetchService { constructor( private readonly gmailClientProvider: MessagingGmailClientProvider, - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelRepository: MessageChannelRepository, @InjectCacheStorage(CacheStorageNamespace.Messaging) private readonly cacheStorage: CacheStorageService, private readonly gmailErrorHandlingService: MessagingErrorHandlingService, @@ -44,7 +40,6 @@ export class MessagingPartialMessageListFetchService { ): Promise { await this.messagingChannelSyncStatusService.markAsMessagesListFetchOngoing( messageChannel.id, - workspaceId, ); const lastSyncHistoryId = messageChannel.syncCursor; @@ -69,14 +64,19 @@ export class MessagingPartialMessageListFetchService { return; } - await this.messageChannelRepository.resetThrottleFailureCount( - messageChannel.id, - workspaceId, - ); + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); - await this.messageChannelRepository.resetSyncStageStartedAt( - messageChannel.id, - workspaceId, + await messageChannelRepository.update( + { + id: messageChannel.id, + }, + { + throttleFailureCount: 0, + syncStageStartedAt: null, + }, ); if (!historyId) { @@ -92,7 +92,6 @@ export class MessagingPartialMessageListFetchService { await this.messagingChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch( messageChannel.id, - workspaceId, ); return; @@ -147,15 +146,21 @@ export class MessagingPartialMessageListFetchService { `Deleted ${messagesDeleted.length} messages for workspace ${workspaceId} and account ${connectedAccount.id}`, ); - await this.messageChannelRepository.updateLastSyncCursorIfHigher( - messageChannel.id, - historyId, - workspaceId, - ); + const currentSyncCursor = messageChannel.syncCursor; + + if (!currentSyncCursor || historyId > currentSyncCursor) { + await messageChannelRepository.update( + { + id: messageChannel.id, + }, + { + syncCursor: historyId, + }, + ); + } await this.messagingChannelSyncStatusService.scheduleMessagesImport( messageChannel.id, - workspaceId, ); } } diff --git a/packages/twenty-server/src/modules/messaging/message-participant-manager/jobs/messaging-create-company-and-contact-after-sync.job.ts b/packages/twenty-server/src/modules/messaging/message-participant-manager/jobs/messaging-create-company-and-contact-after-sync.job.ts index 4db25adef73b..36be02e7349f 100644 --- a/packages/twenty-server/src/modules/messaging/message-participant-manager/jobs/messaging-create-company-and-contact-after-sync.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-participant-manager/jobs/messaging-create-company-and-contact-after-sync.job.ts @@ -1,15 +1,16 @@ import { Logger } from '@nestjs/common'; +import { Any, IsNull } from 'typeorm'; + import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { FieldActorSource } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { CreateCompanyAndContactService } from 'src/modules/contact-creation-manager/services/create-company-and-contact.service'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; -import { MessageParticipantRepository } from 'src/modules/messaging/common/repositories/message-participant.repository'; import { MessageChannelContactAutoCreationPolicy, MessageChannelWorkspaceEntity, @@ -28,12 +29,9 @@ export class MessagingCreateCompanyAndContactAfterSyncJob { ); constructor( private readonly createCompanyAndContactService: CreateCompanyAndContactService, - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelService: MessageChannelRepository, - @InjectObjectMetadataRepository(MessageParticipantWorkspaceEntity) - private readonly messageParticipantRepository: MessageParticipantRepository, @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) private readonly connectedAccountRepository: ConnectedAccountRepository, + private readonly twentyORMManager: TwentyORMManager, ) {} @Process(MessagingCreateCompanyAndContactAfterSyncJob.name) @@ -45,12 +43,18 @@ export class MessagingCreateCompanyAndContactAfterSyncJob { ); const { workspaceId, messageChannelId } = data; - const messageChannel = await this.messageChannelService.getByIds( - [messageChannelId], - workspaceId, - ); + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + + const messageChannel = await messageChannelRepository.findOneOrFail({ + where: { + id: messageChannelId, + }, + }); - const { contactAutoCreationPolicy, connectedAccountId } = messageChannel[0]; + const { contactAutoCreationPolicy, connectedAccountId } = messageChannel; if ( contactAutoCreationPolicy === MessageChannelContactAutoCreationPolicy.NONE @@ -69,17 +73,29 @@ export class MessagingCreateCompanyAndContactAfterSyncJob { ); } - const contactsToCreate = + const messageParticipantRepository = + await this.twentyORMManager.getRepository( + 'messageParticipant', + ); + + const directionFilter = contactAutoCreationPolicy === MessageChannelContactAutoCreationPolicy.SENT_AND_RECEIVED - ? await this.messageParticipantRepository.getByMessageChannelIdWithoutPersonIdAndWorkspaceMemberId( - messageChannelId, - workspaceId, - ) - : await this.messageParticipantRepository.getByMessageChannelIdWithoutPersonIdAndWorkspaceMemberIdAndMessageOutgoing( + ? Any(['incoming', 'outgoing']) + : 'outgoing'; + + const contactsToCreate = await messageParticipantRepository.find({ + where: { + message: { + messageChannelMessageAssociations: { messageChannelId, - workspaceId, - ); + }, + direction: directionFilter, + }, + personId: IsNull(), + workspaceMemberId: IsNull(), + }, + }); await this.createCompanyAndContactService.createCompaniesAndContactsAndUpdateParticipants( connectedAccount, diff --git a/packages/twenty-server/src/modules/messaging/monitoring/crons/jobs/messaging-message-channel-sync-status-monitoring.cron.ts b/packages/twenty-server/src/modules/messaging/monitoring/crons/jobs/messaging-message-channel-sync-status-monitoring.cron.ts index fb7561d20bd0..9d403fa09941 100644 --- a/packages/twenty-server/src/modules/messaging/monitoring/crons/jobs/messaging-message-channel-sync-status-monitoring.cron.ts +++ b/packages/twenty-server/src/modules/messaging/monitoring/crons/jobs/messaging-message-channel-sync-status-monitoring.cron.ts @@ -11,8 +11,7 @@ import { import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; -import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; +import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingTelemetryService } from 'src/modules/messaging/monitoring/services/messaging-telemetry.service'; @@ -25,9 +24,8 @@ export class MessagingMessageChannelSyncStatusMonitoringCronJob { constructor( @InjectRepository(Workspace, 'core') private readonly workspaceRepository: Repository, - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelRepository: MessageChannelRepository, private readonly messagingTelemetryService: MessagingTelemetryService, + private readonly twentyORMGlobalManager: TwentyORMGlobalManager, ) {} @Process(MessagingMessageChannelSyncStatusMonitoringCronJob.name) @@ -46,9 +44,14 @@ export class MessagingMessageChannelSyncStatusMonitoringCronJob { }); for (const activeWorkspace of activeWorkspaces) { - const messageChannels = await this.messageChannelRepository.getAll( - activeWorkspace.id, - ); + const messageChannelRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + 'messageChannel', + activeWorkspace.id, + ); + const messageChannels = await messageChannelRepository.find({ + select: ['id', 'syncStatus', 'connectedAccountId'], + }); for (const messageChannel of messageChannels) { if (!messageChannel.syncStatus) {