From fa6454c59ffe974d6bcbb0d72e3858499dbb7d28 Mon Sep 17 00:00:00 2001 From: bosiraphael Date: Wed, 7 Aug 2024 12:08:56 +0200 Subject: [PATCH 01/17] remove messageThreadRepository --- .../metadata-to-repository.mapping.ts | 2 - .../repositories/message-thread.repository.ts | 67 ------------------- 2 files changed, 69 deletions(-) delete mode 100644 packages/twenty-server/src/modules/messaging/common/repositories/message-thread.repository.ts diff --git a/packages/twenty-server/src/engine/object-metadata-repository/metadata-to-repository.mapping.ts b/packages/twenty-server/src/engine/object-metadata-repository/metadata-to-repository.mapping.ts index 1ead430c1a88..2f203d2fa322 100644 --- a/packages/twenty-server/src/engine/object-metadata-repository/metadata-to-repository.mapping.ts +++ b/packages/twenty-server/src/engine/object-metadata-repository/metadata-to-repository.mapping.ts @@ -4,7 +4,6 @@ import { ConnectedAccountRepository } from 'src/modules/connected-account/reposi import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/common/repositories/message-channel-message-association.repository'; import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { MessageParticipantRepository } from 'src/modules/messaging/common/repositories/message-participant.repository'; -import { MessageThreadRepository } from 'src/modules/messaging/common/repositories/message-thread.repository'; import { MessageRepository } from 'src/modules/messaging/common/repositories/message.repository'; import { PersonRepository } from 'src/modules/person/repositories/person.repository'; import { AuditLogRepository } from 'src/modules/timeline/repositiories/audit-log.repository'; @@ -21,7 +20,6 @@ export const metadataToRepositoryMapping = { MessageChannelWorkspaceEntity: MessageChannelRepository, MessageWorkspaceEntity: MessageRepository, MessageParticipantWorkspaceEntity: MessageParticipantRepository, - MessageThreadWorkspaceEntity: MessageThreadRepository, PersonWorkspaceEntity: PersonRepository, TimelineActivityWorkspaceEntity: TimelineActivityRepository, WorkspaceMemberWorkspaceEntity: WorkspaceMemberRepository, diff --git a/packages/twenty-server/src/modules/messaging/common/repositories/message-thread.repository.ts b/packages/twenty-server/src/modules/messaging/common/repositories/message-thread.repository.ts deleted file mode 100644 index bf0e9d7c90af..000000000000 --- a/packages/twenty-server/src/modules/messaging/common/repositories/message-thread.repository.ts +++ /dev/null @@ -1,67 +0,0 @@ -import { Injectable } from '@nestjs/common'; - -import { EntityManager } from 'typeorm'; - -import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; - -@Injectable() -export class MessageThreadRepository { - constructor( - private readonly workspaceDataSourceService: WorkspaceDataSourceService, - ) {} - - public async getOrphanThreadIdsPaginated( - limit: number, - offset: number, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - const orphanThreads = await this.workspaceDataSourceService.executeRawQuery( - `SELECT mt.id - FROM ${dataSourceSchema}."messageThread" mt - LEFT JOIN ${dataSourceSchema}."message" m ON mt.id = m."messageThreadId" - WHERE m."messageThreadId" IS NULL - LIMIT $1 OFFSET $2`, - [limit, offset], - workspaceId, - transactionManager, - ); - - return orphanThreads.map(({ id }) => id); - } - - public async deleteByIds( - messageThreadIds: string[], - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `DELETE FROM ${dataSourceSchema}."messageThread" WHERE id = ANY($1)`, - [messageThreadIds], - workspaceId, - transactionManager, - ); - } - - public async insert( - messageThreadId: string, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `INSERT INTO ${dataSourceSchema}."messageThread" (id) VALUES ($1)`, - [messageThreadId], - workspaceId, - transactionManager, - ); - } -} From d0ee23db6807b22583b82081baa8a0954aec543b Mon Sep 17 00:00:00 2001 From: bosiraphael Date: Wed, 7 Aug 2024 12:45:20 +0200 Subject: [PATCH 02/17] remove messageParticipantRepository and replace queries --- .../metadata-to-repository.mapping.ts | 2 - .../common/messaging-common.module.ts | 4 - .../message-participant.repository.ts | 209 ------------------ ...eate-company-and-contact-after-sync.job.ts | 35 ++- 4 files changed, 24 insertions(+), 226 deletions(-) delete mode 100644 packages/twenty-server/src/modules/messaging/common/repositories/message-participant.repository.ts diff --git a/packages/twenty-server/src/engine/object-metadata-repository/metadata-to-repository.mapping.ts b/packages/twenty-server/src/engine/object-metadata-repository/metadata-to-repository.mapping.ts index 2f203d2fa322..1b1934e4bad3 100644 --- a/packages/twenty-server/src/engine/object-metadata-repository/metadata-to-repository.mapping.ts +++ b/packages/twenty-server/src/engine/object-metadata-repository/metadata-to-repository.mapping.ts @@ -3,7 +3,6 @@ import { CompanyRepository } from 'src/modules/company/repositories/company.repo import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/common/repositories/message-channel-message-association.repository'; import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; -import { MessageParticipantRepository } from 'src/modules/messaging/common/repositories/message-participant.repository'; import { MessageRepository } from 'src/modules/messaging/common/repositories/message.repository'; import { PersonRepository } from 'src/modules/person/repositories/person.repository'; import { AuditLogRepository } from 'src/modules/timeline/repositiories/audit-log.repository'; @@ -19,7 +18,6 @@ export const metadataToRepositoryMapping = { MessageChannelMessageAssociationRepository, MessageChannelWorkspaceEntity: MessageChannelRepository, MessageWorkspaceEntity: MessageRepository, - MessageParticipantWorkspaceEntity: MessageParticipantRepository, PersonWorkspaceEntity: PersonRepository, TimelineActivityWorkspaceEntity: TimelineActivityRepository, WorkspaceMemberWorkspaceEntity: WorkspaceMemberRepository, diff --git a/packages/twenty-server/src/modules/messaging/common/messaging-common.module.ts b/packages/twenty-server/src/modules/messaging/common/messaging-common.module.ts index 29b8dd7c222c..0c2e681ab097 100644 --- a/packages/twenty-server/src/modules/messaging/common/messaging-common.module.ts +++ b/packages/twenty-server/src/modules/messaging/common/messaging-common.module.ts @@ -6,8 +6,6 @@ import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repos import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module'; import { ConnectedAccountModule } from 'src/modules/connected-account/connected-account.module'; import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; -import { MessageParticipantWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-participant.workspace-entity'; -import { MessageThreadWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-thread.workspace-entity'; import { MessageWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message.workspace-entity'; import { PersonWorkspaceEntity } from 'src/modules/person/standard-objects/person.workspace-entity'; @@ -16,9 +14,7 @@ import { PersonWorkspaceEntity } from 'src/modules/person/standard-objects/perso WorkspaceDataSourceModule, ObjectMetadataRepositoryModule.forFeature([ PersonWorkspaceEntity, - MessageParticipantWorkspaceEntity, MessageWorkspaceEntity, - MessageThreadWorkspaceEntity, ]), TypeOrmModule.forFeature([FeatureFlagEntity], 'core'), ConnectedAccountModule, diff --git a/packages/twenty-server/src/modules/messaging/common/repositories/message-participant.repository.ts b/packages/twenty-server/src/modules/messaging/common/repositories/message-participant.repository.ts deleted file mode 100644 index 2cb2214f1168..000000000000 --- a/packages/twenty-server/src/modules/messaging/common/repositories/message-participant.repository.ts +++ /dev/null @@ -1,209 +0,0 @@ -import { Injectable } from '@nestjs/common'; - -import { EntityManager } from 'typeorm'; - -import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; -import { MessageParticipantWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-participant.workspace-entity'; -import { ParticipantWithId } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message'; - -@Injectable() -export class MessageParticipantRepository { - constructor( - private readonly workspaceDataSourceService: WorkspaceDataSourceService, - ) {} - - public async getByHandles( - handles: string[], - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - return await this.workspaceDataSourceService.executeRawQuery( - `SELECT * FROM ${dataSourceSchema}."messageParticipant" WHERE "handle" = ANY($1)`, - [handles], - workspaceId, - transactionManager, - ); - } - - public async updateParticipantsPersonId( - participantIds: string[], - personId: string, - workspaceId: string, - transactionManager?: EntityManager, - ) { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageParticipant" SET "personId" = $1 WHERE "id" = ANY($2)`, - [personId, participantIds], - workspaceId, - transactionManager, - ); - } - - public async updateParticipantsPersonIdAndReturn( - participantIds: string[], - personId: string, - workspaceId: string, - transactionManager?: EntityManager, - ) { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - return await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageParticipant" SET "personId" = $1 WHERE "id" = ANY($2) RETURNING *`, - [personId, participantIds], - workspaceId, - transactionManager, - ); - } - - public async updateParticipantsWorkspaceMemberId( - participantIds: string[], - workspaceMemberId: string, - workspaceId: string, - transactionManager?: EntityManager, - ) { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageParticipant" SET "workspaceMemberId" = $1 WHERE "id" = ANY($2)`, - [workspaceMemberId, participantIds], - workspaceId, - transactionManager, - ); - } - - public async removePersonIdByHandle( - handle: string, - workspaceId: string, - transactionManager?: EntityManager, - ) { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageParticipant" SET "personId" = NULL WHERE "handle" = $1`, - [handle], - workspaceId, - transactionManager, - ); - } - - public async removeWorkspaceMemberIdByHandle( - handle: string, - workspaceId: string, - transactionManager?: EntityManager, - ) { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageParticipant" SET "workspaceMemberId" = NULL WHERE "handle" = $1`, - [handle], - workspaceId, - transactionManager, - ); - } - - public async getByMessageChannelIdWithoutPersonIdAndWorkspaceMemberIdAndMessageOutgoing( - messageChannelId: string, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - if (!messageChannelId || !workspaceId) { - return []; - } - - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - const messageParticipants: ParticipantWithId[] = - await this.workspaceDataSourceService.executeRawQuery( - `SELECT "messageParticipant".id, - "messageParticipant"."role", - "messageParticipant"."handle", - "messageParticipant"."displayName", - "messageParticipant"."personId", - "messageParticipant"."workspaceMemberId", - "messageParticipant"."messageId" - FROM ${dataSourceSchema}."messageParticipant" "messageParticipant" - LEFT JOIN ${dataSourceSchema}."message" ON "messageParticipant"."messageId" = ${dataSourceSchema}."message"."id" - LEFT JOIN ${dataSourceSchema}."messageChannelMessageAssociation" ON ${dataSourceSchema}."messageChannelMessageAssociation"."messageId" = ${dataSourceSchema}."message"."id" - WHERE ${dataSourceSchema}."messageChannelMessageAssociation"."messageChannelId" = $1 - AND "messageParticipant"."personId" IS NULL - AND "messageParticipant"."workspaceMemberId" IS NULL - AND ${dataSourceSchema}."message"."direction" = 'outgoing'`, - [messageChannelId], - workspaceId, - transactionManager, - ); - - return messageParticipants; - } - - public async getByMessageChannelIdWithoutPersonIdAndWorkspaceMemberId( - messageChannelId: string, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - if (!messageChannelId || !workspaceId) { - return []; - } - - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - const messageParticipants: ParticipantWithId[] = - await this.workspaceDataSourceService.executeRawQuery( - `SELECT "messageParticipant".id, - "messageParticipant"."role", - "messageParticipant"."handle", - "messageParticipant"."displayName", - "messageParticipant"."personId", - "messageParticipant"."workspaceMemberId", - "messageParticipant"."messageId" - FROM ${dataSourceSchema}."messageParticipant" "messageParticipant" - LEFT JOIN ${dataSourceSchema}."message" ON "messageParticipant"."messageId" = ${dataSourceSchema}."message"."id" - LEFT JOIN ${dataSourceSchema}."messageChannelMessageAssociation" ON ${dataSourceSchema}."messageChannelMessageAssociation"."messageId" = ${dataSourceSchema}."message"."id" - WHERE ${dataSourceSchema}."messageChannelMessageAssociation"."messageChannelId" = $1 - AND "messageParticipant"."personId" IS NULL - AND "messageParticipant"."workspaceMemberId" IS NULL`, - [messageChannelId], - workspaceId, - transactionManager, - ); - - return messageParticipants; - } - - public async getWithoutPersonIdAndWorkspaceMemberId( - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - if (!workspaceId) { - throw new Error('WorkspaceId is required'); - } - - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - const messageParticipants: ParticipantWithId[] = - await this.workspaceDataSourceService.executeRawQuery( - `SELECT "messageParticipant".* - FROM ${dataSourceSchema}."messageParticipant" "messageParticipant" - WHERE "messageParticipant"."personId" IS NULL - AND "messageParticipant"."workspaceMemberId" IS NULL`, - [], - workspaceId, - transactionManager, - ); - - return messageParticipants; - } -} diff --git a/packages/twenty-server/src/modules/messaging/message-participant-manager/jobs/messaging-create-company-and-contact-after-sync.job.ts b/packages/twenty-server/src/modules/messaging/message-participant-manager/jobs/messaging-create-company-and-contact-after-sync.job.ts index 620432e06848..06cd7321cb8d 100644 --- a/packages/twenty-server/src/modules/messaging/message-participant-manager/jobs/messaging-create-company-and-contact-after-sync.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-participant-manager/jobs/messaging-create-company-and-contact-after-sync.job.ts @@ -1,14 +1,16 @@ import { Logger } from '@nestjs/common'; +import { Any, IsNull } from 'typeorm'; + import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { CreateCompanyAndContactService } from 'src/modules/contact-creation-manager/services/create-company-and-contact.service'; import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; -import { MessageParticipantRepository } from 'src/modules/messaging/common/repositories/message-participant.repository'; import { MessageChannelContactAutoCreationPolicy, MessageChannelWorkspaceEntity, @@ -29,10 +31,9 @@ export class MessagingCreateCompanyAndContactAfterSyncJob { private readonly createCompanyAndContactService: CreateCompanyAndContactService, @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) private readonly messageChannelService: MessageChannelRepository, - @InjectObjectMetadataRepository(MessageParticipantWorkspaceEntity) - private readonly messageParticipantRepository: MessageParticipantRepository, @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) private readonly connectedAccountRepository: ConnectedAccountRepository, + private readonly twentyORMManager: TwentyORMManager, ) {} @Process(MessagingCreateCompanyAndContactAfterSyncJob.name) @@ -68,17 +69,29 @@ export class MessagingCreateCompanyAndContactAfterSyncJob { ); } - const contactsToCreate = + const messageParticipantRepository = + await this.twentyORMManager.getRepository( + 'messageParticipant', + ); + + const directionFilter = contactAutoCreationPolicy === MessageChannelContactAutoCreationPolicy.SENT_AND_RECEIVED - ? await this.messageParticipantRepository.getByMessageChannelIdWithoutPersonIdAndWorkspaceMemberId( - messageChannelId, - workspaceId, - ) - : await this.messageParticipantRepository.getByMessageChannelIdWithoutPersonIdAndWorkspaceMemberIdAndMessageOutgoing( + ? Any(['incoming', 'outgoing']) + : 'outgoing'; + + const contactsToCreate = await messageParticipantRepository.find({ + where: { + message: { + messageChannelMessageAssociations: { messageChannelId, - workspaceId, - ); + }, + direction: directionFilter, + }, + personId: IsNull(), + workspaceMemberId: IsNull(), + }, + }); await this.createCompanyAndContactService.createCompaniesAndContactsAndUpdateParticipants( connectedAccount, From 8e33133d5cf6376b1fb4eb677778698b526ee9bd Mon Sep 17 00:00:00 2001 From: bosiraphael Date: Wed, 7 Aug 2024 14:21:59 +0200 Subject: [PATCH 03/17] remove messageChannelMessageAssociationRepository --- .../metadata-to-repository.mapping.ts | 4 +- ...ging-blocklist-item-delete-messages.job.ts | 27 +++--- .../messaging-query-hook.module.ts | 4 +- ...-channel-message-association.repository.ts | 87 ------------------- .../gmail/messaging-gmail-driver.module.ts | 2 - 5 files changed, 19 insertions(+), 105 deletions(-) delete mode 100644 packages/twenty-server/src/modules/messaging/common/repositories/message-channel-message-association.repository.ts diff --git a/packages/twenty-server/src/engine/object-metadata-repository/metadata-to-repository.mapping.ts b/packages/twenty-server/src/engine/object-metadata-repository/metadata-to-repository.mapping.ts index 1b1934e4bad3..5f23d3da1b14 100644 --- a/packages/twenty-server/src/engine/object-metadata-repository/metadata-to-repository.mapping.ts +++ b/packages/twenty-server/src/engine/object-metadata-repository/metadata-to-repository.mapping.ts @@ -1,7 +1,6 @@ import { BlocklistRepository } from 'src/modules/blocklist/repositories/blocklist.repository'; import { CompanyRepository } from 'src/modules/company/repositories/company.repository'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; -import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/common/repositories/message-channel-message-association.repository'; import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { MessageRepository } from 'src/modules/messaging/common/repositories/message.repository'; import { PersonRepository } from 'src/modules/person/repositories/person.repository'; @@ -14,8 +13,7 @@ export const metadataToRepositoryMapping = { BlocklistWorkspaceEntity: BlocklistRepository, CompanyWorkspaceEntity: CompanyRepository, ConnectedAccountWorkspaceEntity: ConnectedAccountRepository, - MessageChannelMessageAssociationWorkspaceEntity: - MessageChannelMessageAssociationRepository, + MessageChannelWorkspaceEntity: MessageChannelRepository, MessageWorkspaceEntity: MessageRepository, PersonWorkspaceEntity: PersonRepository, diff --git a/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job.ts b/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job.ts index a2941c679d1e..8e8165f19ccd 100644 --- a/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job.ts +++ b/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job.ts @@ -1,12 +1,14 @@ import { Logger, Scope } from '@nestjs/common'; +import { Any } from 'typeorm'; + import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { BlocklistRepository } from 'src/modules/blocklist/repositories/blocklist.repository'; import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity'; -import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/common/repositories/message-channel-message-association.repository'; import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; @@ -27,13 +29,10 @@ export class BlocklistItemDeleteMessagesJob { constructor( @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) private readonly messageChannelRepository: MessageChannelRepository, - @InjectObjectMetadataRepository( - MessageChannelMessageAssociationWorkspaceEntity, - ) - private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository, @InjectObjectMetadataRepository(BlocklistWorkspaceEntity) private readonly blocklistRepository: BlocklistRepository, private readonly threadCleanerService: MessagingMessageCleanerService, + private readonly twentyORMManager: TwentyORMManager, ) {} @Process(BlocklistItemDeleteMessagesJob.name) @@ -75,13 +74,21 @@ export class BlocklistItemDeleteMessagesJob { const rolesToDelete: ('from' | 'to')[] = ['from', 'to']; + const messageChannelMessageAssociationRepository = + await this.twentyORMManager.getRepository( + 'messageChannelMessageAssociation', + ); + for (const messageChannelId of messageChannelIds) { - await this.messageChannelMessageAssociationRepository.deleteByMessageParticipantHandleAndMessageChannelIdAndRoles( - handle, + messageChannelMessageAssociationRepository.delete({ messageChannelId, - rolesToDelete, - workspaceId, - ); + message: { + messageParticipants: { + handle, + role: Any(rolesToDelete), + }, + }, + }); } await this.threadCleanerService.cleanWorkspaceThreads(workspaceId); diff --git a/packages/twenty-server/src/modules/messaging/common/query-hooks/messaging-query-hook.module.ts b/packages/twenty-server/src/modules/messaging/common/query-hooks/messaging-query-hook.module.ts index 4268de828017..a6e369c9bc3b 100644 --- a/packages/twenty-server/src/modules/messaging/common/query-hooks/messaging-query-hook.module.ts +++ b/packages/twenty-server/src/modules/messaging/common/query-hooks/messaging-query-hook.module.ts @@ -1,18 +1,16 @@ import { Module } from '@nestjs/common'; import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module'; -import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { CanAccessMessageThreadService } from 'src/modules/messaging/common/query-hooks/message/can-access-message-thread.service'; import { MessageFindManyPreQueryHook } from 'src/modules/messaging/common/query-hooks/message/message-find-many.pre-query.hook'; import { MessageFindOnePreQueryHook } from 'src/modules/messaging/common/query-hooks/message/message-find-one.pre-query-hook'; -import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity'; @Module({ imports: [ ObjectMetadataRepositoryModule.forFeature([ - MessageChannelMessageAssociationWorkspaceEntity, MessageChannelWorkspaceEntity, ConnectedAccountWorkspaceEntity, WorkspaceMemberWorkspaceEntity, diff --git a/packages/twenty-server/src/modules/messaging/common/repositories/message-channel-message-association.repository.ts b/packages/twenty-server/src/modules/messaging/common/repositories/message-channel-message-association.repository.ts deleted file mode 100644 index 0013e036f5e7..000000000000 --- a/packages/twenty-server/src/modules/messaging/common/repositories/message-channel-message-association.repository.ts +++ /dev/null @@ -1,87 +0,0 @@ -import { Injectable } from '@nestjs/common'; - -import { EntityManager } from 'typeorm'; - -import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; - -@Injectable() -export class MessageChannelMessageAssociationRepository { - constructor( - private readonly workspaceDataSourceService: WorkspaceDataSourceService, - ) {} - - public async deleteByMessageParticipantHandleAndMessageChannelIdAndRoles( - messageParticipantHandle: string, - messageChannelId: string, - rolesToDelete: ('from' | 'to' | 'cc' | 'bcc')[], - workspaceId: string, - transactionManager?: EntityManager, - ) { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - const isHandleDomain = messageParticipantHandle.startsWith('@'); - - const messageChannel = - await this.workspaceDataSourceService.executeRawQuery( - `SELECT * FROM ${dataSourceSchema}."messageChannel" - WHERE "id" = $1`, - [messageChannelId], - workspaceId, - transactionManager, - ); - - const messageChannelHandle = messageChannel[0].handle; - - const messageChannelMessageAssociationIdsToDelete = - await this.workspaceDataSourceService.executeRawQuery( - `SELECT "messageChannelMessageAssociation".id - FROM ${dataSourceSchema}."messageChannelMessageAssociation" "messageChannelMessageAssociation" - JOIN ${dataSourceSchema}."message" ON "messageChannelMessageAssociation"."messageId" = ${dataSourceSchema}."message"."id" - JOIN ${dataSourceSchema}."messageParticipant" "messageParticipant" ON ${dataSourceSchema}."message"."id" = "messageParticipant"."messageId" - WHERE "messageParticipant"."handle" != $1 - AND "messageParticipant"."handle" ${isHandleDomain ? '~*' : '='} $2 - AND "messageParticipant"."role" = ANY($3) - AND "messageChannelMessageAssociation"."messageChannelId" = $4`, - [ - messageChannelHandle, - isHandleDomain - ? // eslint-disable-next-line no-useless-escape - `.+@(.+\.)?${messageParticipantHandle.slice(1)}` - : messageParticipantHandle, - rolesToDelete, - messageChannelId, - ], - workspaceId, - transactionManager, - ); - - const messageChannelMessageAssociationIdsToDeleteArray = - messageChannelMessageAssociationIdsToDelete.map( - (messageChannelMessageAssociation: { id: string }) => - messageChannelMessageAssociation.id, - ); - - await this.deleteByIds( - messageChannelMessageAssociationIdsToDeleteArray, - workspaceId, - transactionManager, - ); - } - - public async deleteByIds( - ids: string[], - workspaceId: string, - transactionManager?: EntityManager, - ) { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `DELETE FROM ${dataSourceSchema}."messageChannelMessageAssociation" WHERE "id" = ANY($1)`, - [ids], - workspaceId, - transactionManager, - ); - } -} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/messaging-gmail-driver.module.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/messaging-gmail-driver.module.ts index 008bc157614f..4c4270f91685 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/messaging-gmail-driver.module.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/messaging-gmail-driver.module.ts @@ -12,7 +12,6 @@ import { EmailAliasManagerModule } from 'src/modules/connected-account/email-ali import { OAuth2ClientManagerModule } from 'src/modules/connected-account/oauth2-client-manager/oauth2-client-manager.module'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { MessagingCommonModule } from 'src/modules/messaging/common/messaging-common.module'; -import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingGmailClientProvider } from 'src/modules/messaging/message-import-manager/drivers/gmail/providers/messaging-gmail-client.provider'; import { MessagingGmailFetchByBatchesService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-by-batch.service'; @@ -30,7 +29,6 @@ import { MessageParticipantManagerModule } from 'src/modules/messaging/message-p ObjectMetadataRepositoryModule.forFeature([ ConnectedAccountWorkspaceEntity, MessageChannelWorkspaceEntity, - MessageChannelMessageAssociationWorkspaceEntity, BlocklistWorkspaceEntity, ]), MessagingCommonModule, From 51da760ec94d3c04804cc0ffbe664fef7c6d153e Mon Sep 17 00:00:00 2001 From: bosiraphael Date: Wed, 7 Aug 2024 14:26:47 +0200 Subject: [PATCH 04/17] remove message repository --- .../metadata-to-repository.mapping.ts | 2 - .../common/messaging-common.module.ts | 6 +- .../common/repositories/message.repository.ts | 137 ------------------ 3 files changed, 1 insertion(+), 144 deletions(-) delete mode 100644 packages/twenty-server/src/modules/messaging/common/repositories/message.repository.ts diff --git a/packages/twenty-server/src/engine/object-metadata-repository/metadata-to-repository.mapping.ts b/packages/twenty-server/src/engine/object-metadata-repository/metadata-to-repository.mapping.ts index 5f23d3da1b14..28fe7f6fd07d 100644 --- a/packages/twenty-server/src/engine/object-metadata-repository/metadata-to-repository.mapping.ts +++ b/packages/twenty-server/src/engine/object-metadata-repository/metadata-to-repository.mapping.ts @@ -2,7 +2,6 @@ import { BlocklistRepository } from 'src/modules/blocklist/repositories/blocklis import { CompanyRepository } from 'src/modules/company/repositories/company.repository'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; -import { MessageRepository } from 'src/modules/messaging/common/repositories/message.repository'; import { PersonRepository } from 'src/modules/person/repositories/person.repository'; import { AuditLogRepository } from 'src/modules/timeline/repositiories/audit-log.repository'; import { TimelineActivityRepository } from 'src/modules/timeline/repositiories/timeline-activity.repository'; @@ -15,7 +14,6 @@ export const metadataToRepositoryMapping = { ConnectedAccountWorkspaceEntity: ConnectedAccountRepository, MessageChannelWorkspaceEntity: MessageChannelRepository, - MessageWorkspaceEntity: MessageRepository, PersonWorkspaceEntity: PersonRepository, TimelineActivityWorkspaceEntity: TimelineActivityRepository, WorkspaceMemberWorkspaceEntity: WorkspaceMemberRepository, diff --git a/packages/twenty-server/src/modules/messaging/common/messaging-common.module.ts b/packages/twenty-server/src/modules/messaging/common/messaging-common.module.ts index 0c2e681ab097..4a2d1a867099 100644 --- a/packages/twenty-server/src/modules/messaging/common/messaging-common.module.ts +++ b/packages/twenty-server/src/modules/messaging/common/messaging-common.module.ts @@ -6,16 +6,12 @@ import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repos import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module'; import { ConnectedAccountModule } from 'src/modules/connected-account/connected-account.module'; import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; -import { MessageWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message.workspace-entity'; import { PersonWorkspaceEntity } from 'src/modules/person/standard-objects/person.workspace-entity'; @Module({ imports: [ WorkspaceDataSourceModule, - ObjectMetadataRepositoryModule.forFeature([ - PersonWorkspaceEntity, - MessageWorkspaceEntity, - ]), + ObjectMetadataRepositoryModule.forFeature([PersonWorkspaceEntity]), TypeOrmModule.forFeature([FeatureFlagEntity], 'core'), ConnectedAccountModule, ], diff --git a/packages/twenty-server/src/modules/messaging/common/repositories/message.repository.ts b/packages/twenty-server/src/modules/messaging/common/repositories/message.repository.ts deleted file mode 100644 index 5f1d3eb25a4c..000000000000 --- a/packages/twenty-server/src/modules/messaging/common/repositories/message.repository.ts +++ /dev/null @@ -1,137 +0,0 @@ -import { Injectable } from '@nestjs/common'; - -import { EntityManager } from 'typeorm'; - -import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; -import { MessageWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message.workspace-entity'; - -@Injectable() -export class MessageRepository { - constructor( - private readonly workspaceDataSourceService: WorkspaceDataSourceService, - ) {} - - public async getNonAssociatedMessageIdsPaginated( - limit: number, - offset: number, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - const nonAssociatedMessages = - await this.workspaceDataSourceService.executeRawQuery( - `SELECT m.id FROM ${dataSourceSchema}."message" m - LEFT JOIN ${dataSourceSchema}."messageChannelMessageAssociation" mcma - ON m.id = mcma."messageId" - WHERE mcma.id IS NULL - LIMIT $1 OFFSET $2`, - [limit, offset], - workspaceId, - transactionManager, - ); - - return nonAssociatedMessages.map(({ id }) => id); - } - - public async getFirstOrNullByHeaderMessageId( - headerMessageId: string, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - const messages = await this.workspaceDataSourceService.executeRawQuery( - `SELECT * FROM ${dataSourceSchema}."message" WHERE "headerMessageId" = $1 LIMIT 1`, - [headerMessageId], - workspaceId, - transactionManager, - ); - - if (!messages || messages.length === 0) { - return null; - } - - return messages[0]; - } - - public async getByIds( - messageIds: string[], - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - return await this.workspaceDataSourceService.executeRawQuery( - `SELECT * FROM ${dataSourceSchema}."message" WHERE "id" = ANY($1)`, - [messageIds], - workspaceId, - transactionManager, - ); - } - - public async deleteByIds( - messageIds: string[], - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `DELETE FROM ${dataSourceSchema}."message" WHERE "id" = ANY($1)`, - [messageIds], - workspaceId, - transactionManager, - ); - } - - public async getByMessageThreadIds( - messageThreadIds: string[], - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - return await this.workspaceDataSourceService.executeRawQuery( - `SELECT * FROM ${dataSourceSchema}."message" WHERE "messageThreadId" = ANY($1)`, - [messageThreadIds], - workspaceId, - transactionManager, - ); - } - - public async insert( - id: string, - headerMessageId: string, - subject: string, - receivedAt: Date, - messageDirection: string, - messageThreadId: string, - text: string, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `INSERT INTO ${dataSourceSchema}."message" ("id", "headerMessageId", "subject", "receivedAt", "direction", "messageThreadId", "text") VALUES ($1, $2, $3, $4, $5, $6, $7)`, - [ - id, - headerMessageId, - subject, - receivedAt, - messageDirection, - messageThreadId, - text, - ], - workspaceId, - transactionManager, - ); - } -} From fe9e334a03029c0168f390950c6cfd6e18c6d622 Mon Sep 17 00:00:00 2001 From: bosiraphael Date: Wed, 7 Aug 2024 14:35:32 +0200 Subject: [PATCH 05/17] replace raw queries from messageChannelRepository --- .../auth/services/google-apis.service.ts | 23 ++++++-- .../message-channel.repository.ts | 57 +------------------ 2 files changed, 20 insertions(+), 60 deletions(-) diff --git a/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts b/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts index 4f0efc5f3a5a..fd445cecbd23 100644 --- a/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts +++ b/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts @@ -25,6 +25,7 @@ import { } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { + MessageChannelSyncStage, MessageChannelSyncStatus, MessageChannelType, MessageChannelVisibility, @@ -88,6 +89,11 @@ export class GoogleAPIsService { 'calendarChannel', ); + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + const workspaceDataSource = await this.twentyORMManager.getDatasource(); await workspaceDataSource.transaction(async (manager: EntityManager) => { @@ -105,7 +111,7 @@ export class GoogleAPIsService { manager, ); - await this.messageChannelRepository.create( + await messageChannelRepository.save( { id: v4(), connectedAccountId: newOrExistingConnectedAccountId, @@ -115,7 +121,7 @@ export class GoogleAPIsService { messageVisibility || MessageChannelVisibility.SHARE_EVERYTHING, syncStatus: MessageChannelSyncStatus.ONGOING, }, - workspaceId, + {}, manager, ); @@ -159,9 +165,16 @@ export class GoogleAPIsService { newOrExistingConnectedAccountId, ); - await this.messageChannelRepository.resetSync( - newOrExistingConnectedAccountId, - workspaceId, + await messageChannelRepository.update( + { + connectedAccountId: newOrExistingConnectedAccountId, + }, + { + syncStage: MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING, + syncStatus: null, + syncCursor: '', + syncStageStartedAt: null, + }, manager, ); } diff --git a/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts b/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts index 5b3cb4a45202..29aca452a3e5 100644 --- a/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts +++ b/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts @@ -4,9 +4,9 @@ import { EntityManager } from 'typeorm'; import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; import { - MessageChannelWorkspaceEntity, - MessageChannelSyncStatus, MessageChannelSyncStage, + MessageChannelSyncStatus, + MessageChannelWorkspaceEntity, } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; @Injectable() @@ -15,59 +15,6 @@ export class MessageChannelRepository { private readonly workspaceDataSourceService: WorkspaceDataSourceService, ) {} - public async create( - messageChannel: Pick< - MessageChannelWorkspaceEntity, - | 'id' - | 'connectedAccountId' - | 'type' - | 'handle' - | 'visibility' - | 'syncStatus' - >, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `INSERT INTO ${dataSourceSchema}."messageChannel" ("id", "connectedAccountId", "type", "handle", "visibility", "syncStatus") - VALUES ($1, $2, $3, $4, $5, $6)`, - [ - messageChannel.id, - messageChannel.connectedAccountId, - messageChannel.type, - messageChannel.handle, - messageChannel.visibility, - messageChannel.syncStatus, - ], - workspaceId, - transactionManager, - ); - } - - public async resetSync( - connectedAccountId: string, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageChannel" - SET "syncStatus" = NULL, - "syncStage" = '${MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING}', - "syncCursor" = '', - "syncStageStartedAt" = NULL - WHERE "connectedAccountId" = $1`, - [connectedAccountId], - workspaceId, - transactionManager, - ); - } - public async getAll( workspaceId: string, transactionManager?: EntityManager, From dfc796bc9e09b45d8d47734cf4afeafc879c6f2d Mon Sep 17 00:00:00 2001 From: bosiraphael Date: Wed, 7 Aug 2024 14:37:53 +0200 Subject: [PATCH 06/17] replace raw queries from messageChannelRepository --- .../auth/services/google-apis.service.ts | 13 ++--- .../message-channel.repository.ts | 48 ------------------- 2 files changed, 5 insertions(+), 56 deletions(-) diff --git a/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts b/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts index fd445cecbd23..157c82ab7fd7 100644 --- a/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts +++ b/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts @@ -23,7 +23,6 @@ import { ConnectedAccountProvider, ConnectedAccountWorkspaceEntity, } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { MessageChannelSyncStage, MessageChannelSyncStatus, @@ -48,8 +47,6 @@ export class GoogleAPIsService { private readonly environmentService: EnvironmentService, @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) private readonly connectedAccountRepository: ConnectedAccountRepository, - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelRepository: MessageChannelRepository, private readonly accountsToReconnectService: AccountsToReconnectService, ) {} @@ -181,11 +178,11 @@ export class GoogleAPIsService { }); if (this.environmentService.get('MESSAGING_PROVIDER_GMAIL_ENABLED')) { - const messageChannels = - await this.messageChannelRepository.getByConnectedAccountId( - newOrExistingConnectedAccountId, - workspaceId, - ); + const messageChannels = await messageChannelRepository.find({ + where: { + connectedAccountId: newOrExistingConnectedAccountId, + }, + }); for (const messageChannel of messageChannels) { await this.messageQueueService.add( diff --git a/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts b/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts index 29aca452a3e5..5c1a66e8257d 100644 --- a/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts +++ b/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts @@ -30,54 +30,6 @@ export class MessageChannelRepository { ); } - public async getByConnectedAccountId( - connectedAccountId: string, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - return await this.workspaceDataSourceService.executeRawQuery( - `SELECT * FROM ${dataSourceSchema}."messageChannel" WHERE "connectedAccountId" = $1 AND "type" = 'email' LIMIT 1`, - [connectedAccountId], - workspaceId, - transactionManager, - ); - } - - public async getFirstByConnectedAccountIdOrFail( - connectedAccountId: string, - workspaceId: string, - ): Promise { - const messageChannel = await this.getFirstByConnectedAccountId( - connectedAccountId, - workspaceId, - ); - - if (!messageChannel) { - throw new Error( - `Message channel for connected account ${connectedAccountId} not found in workspace ${workspaceId}`, - ); - } - - return messageChannel; - } - - public async getFirstByConnectedAccountId( - connectedAccountId: string, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const messageChannels = await this.getByConnectedAccountId( - connectedAccountId, - workspaceId, - transactionManager, - ); - - return messageChannels[0]; - } - public async getByIds( ids: string[], workspaceId: string, From 15781b5773cfa14097d263d81235ba959dd2bd91 Mon Sep 17 00:00:00 2001 From: bosiraphael Date: Wed, 7 Aug 2024 15:31:31 +0200 Subject: [PATCH 07/17] replace raw queries from messageChannelRepository --- ...ging-blocklist-item-delete-messages.job.ts | 33 +++++++++++-------- .../message-channel.repository.ts | 21 ------------ 2 files changed, 19 insertions(+), 35 deletions(-) diff --git a/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job.ts b/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job.ts index 8e8165f19ccd..9add24259a38 100644 --- a/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job.ts +++ b/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job.ts @@ -64,12 +64,19 @@ export class BlocklistItemDeleteMessagesJob { ); } - const messageChannels = - await this.messageChannelRepository.getIdsByWorkspaceMemberId( - workspaceMemberId, - workspaceId, + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', ); + const messageChannels = await messageChannelRepository.find({ + where: { + connectedAccount: { + accountOwnerId: workspaceMemberId, + }, + }, + }); + const messageChannelIds = messageChannels.map(({ id }) => id); const rolesToDelete: ('from' | 'to')[] = ['from', 'to']; @@ -79,17 +86,15 @@ export class BlocklistItemDeleteMessagesJob { 'messageChannelMessageAssociation', ); - for (const messageChannelId of messageChannelIds) { - messageChannelMessageAssociationRepository.delete({ - messageChannelId, - message: { - messageParticipants: { - handle, - role: Any(rolesToDelete), - }, + messageChannelMessageAssociationRepository.delete({ + messageChannelId: Any(messageChannelIds), + message: { + messageParticipants: { + handle, + role: Any(rolesToDelete), }, - }); - } + }, + }); await this.threadCleanerService.cleanWorkspaceThreads(workspaceId); diff --git a/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts b/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts index 5c1a66e8257d..99b7fbb06b91 100644 --- a/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts +++ b/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts @@ -65,27 +65,6 @@ export class MessageChannelRepository { return messageChannels[0]; } - public async getIdsByWorkspaceMemberId( - workspaceMemberId: string, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - const messageChannelIds = - await this.workspaceDataSourceService.executeRawQuery( - `SELECT "messageChannel".id FROM ${dataSourceSchema}."messageChannel" "messageChannel" - JOIN ${dataSourceSchema}."connectedAccount" ON "messageChannel"."connectedAccountId" = ${dataSourceSchema}."connectedAccount"."id" - WHERE ${dataSourceSchema}."connectedAccount"."accountOwnerId" = $1`, - [workspaceMemberId], - workspaceId, - transactionManager, - ); - - return messageChannelIds; - } - public async updateSyncStatus( id: string, syncStatus: MessageChannelSyncStatus, From c6208bd536cb69109f3509b2bc996fd5bca47538 Mon Sep 17 00:00:00 2001 From: bosiraphael Date: Wed, 7 Aug 2024 15:38:54 +0200 Subject: [PATCH 08/17] simplify code --- ...ging-blocklist-item-delete-messages.job.ts | 29 +++++-------------- 1 file changed, 7 insertions(+), 22 deletions(-) diff --git a/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job.ts b/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job.ts index 9add24259a38..3c7d37204761 100644 --- a/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job.ts +++ b/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job.ts @@ -9,9 +9,7 @@ import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repos import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { BlocklistRepository } from 'src/modules/blocklist/repositories/blocklist.repository'; import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; -import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingMessageCleanerService } from 'src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service'; export type BlocklistItemDeleteMessagesJobData = { @@ -27,8 +25,6 @@ export class BlocklistItemDeleteMessagesJob { private readonly logger = new Logger(BlocklistItemDeleteMessagesJob.name); constructor( - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelRepository: MessageChannelRepository, @InjectObjectMetadataRepository(BlocklistWorkspaceEntity) private readonly blocklistRepository: BlocklistRepository, private readonly threadCleanerService: MessagingMessageCleanerService, @@ -64,30 +60,19 @@ export class BlocklistItemDeleteMessagesJob { ); } - const messageChannelRepository = - await this.twentyORMManager.getRepository( - 'messageChannel', + const messageChannelMessageAssociationRepository = + await this.twentyORMManager.getRepository( + 'messageChannelMessageAssociation', ); - const messageChannels = await messageChannelRepository.find({ - where: { + const rolesToDelete: ('from' | 'to')[] = ['from', 'to']; + + await messageChannelMessageAssociationRepository.delete({ + messageChannel: { connectedAccount: { accountOwnerId: workspaceMemberId, }, }, - }); - - const messageChannelIds = messageChannels.map(({ id }) => id); - - const rolesToDelete: ('from' | 'to')[] = ['from', 'to']; - - const messageChannelMessageAssociationRepository = - await this.twentyORMManager.getRepository( - 'messageChannelMessageAssociation', - ); - - messageChannelMessageAssociationRepository.delete({ - messageChannelId: Any(messageChannelIds), message: { messageParticipants: { handle, From f566d258456dc08ce12dd8dfb1856ed0ade01a37 Mon Sep 17 00:00:00 2001 From: bosiraphael Date: Wed, 7 Aug 2024 16:09:37 +0200 Subject: [PATCH 09/17] remove messageChannelRepository --- .../listeners/messaging-blocklist.listener.ts | 4 - .../message-channel.repository.ts | 198 ------------------ .../messaging-message-list-fetch.cron.job.ts | 20 +- .../messaging-messages-import.cron.job.ts | 17 +- ...age-channel-sync-status-monitoring.cron.ts | 17 +- 5 files changed, 30 insertions(+), 226 deletions(-) delete mode 100644 packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts diff --git a/packages/twenty-server/src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener.ts b/packages/twenty-server/src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener.ts index 60433a2902fc..4139c1d07aa9 100644 --- a/packages/twenty-server/src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener.ts +++ b/packages/twenty-server/src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener.ts @@ -15,9 +15,7 @@ import { BlocklistItemDeleteMessagesJob, BlocklistItemDeleteMessagesJobData, } from 'src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; -import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; @Injectable() export class MessagingBlocklistListener { @@ -27,8 +25,6 @@ export class MessagingBlocklistListener { @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) private readonly connectedAccountRepository: ConnectedAccountRepository, private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService, - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelRepository: MessageChannelRepository, ) {} @OnEvent('blocklist.created') diff --git a/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts b/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts deleted file mode 100644 index 99b7fbb06b91..000000000000 --- a/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts +++ /dev/null @@ -1,198 +0,0 @@ -import { Injectable } from '@nestjs/common'; - -import { EntityManager } from 'typeorm'; - -import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; -import { - MessageChannelSyncStage, - MessageChannelSyncStatus, - MessageChannelWorkspaceEntity, -} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; - -@Injectable() -export class MessageChannelRepository { - constructor( - private readonly workspaceDataSourceService: WorkspaceDataSourceService, - ) {} - - public async getAll( - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - return await this.workspaceDataSourceService.executeRawQuery( - `SELECT * FROM ${dataSourceSchema}."messageChannel"`, - [], - workspaceId, - transactionManager, - ); - } - - public async getByIds( - ids: string[], - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - return await this.workspaceDataSourceService.executeRawQuery( - `SELECT * FROM ${dataSourceSchema}."messageChannel" WHERE "id" = ANY($1)`, - [ids], - workspaceId, - transactionManager, - ); - } - - public async getById( - id: string, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - const messageChannels = - await this.workspaceDataSourceService.executeRawQuery( - `SELECT * FROM ${dataSourceSchema}."messageChannel" WHERE "id" = $1`, - [id], - workspaceId, - transactionManager, - ); - - return messageChannels[0]; - } - - public async updateSyncStatus( - id: string, - syncStatus: MessageChannelSyncStatus, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - const needsToUpdateSyncedAt = - syncStatus === MessageChannelSyncStatus.ACTIVE; - - await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageChannel" SET "syncStatus" = $1 ${ - needsToUpdateSyncedAt ? `, "syncedAt" = NOW()` : '' - } WHERE "id" = $2`, - [syncStatus, id], - workspaceId, - transactionManager, - ); - } - - public async updateSyncStage( - id: string, - syncStage: MessageChannelSyncStage, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - const needsToUpdateSyncStageStartedAt = - syncStage === MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING || - syncStage === MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING; - - await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageChannel" SET "syncStage" = $1 ${ - needsToUpdateSyncStageStartedAt ? `, "syncStageStartedAt" = NOW()` : '' - } WHERE "id" = $2`, - [syncStage, id], - workspaceId, - transactionManager, - ); - } - - public async resetSyncStageStartedAt( - id: string, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageChannel" SET "syncStageStartedAt" = NULL WHERE "id" = $1`, - [id], - workspaceId, - transactionManager, - ); - } - - public async updateLastSyncCursorIfHigher( - id: string, - syncCursor: string, - workspaceId: string, - transactionManager?: EntityManager, - ) { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageChannel" SET "syncCursor" = $1 - WHERE "id" = $2 - AND ("syncCursor" < $1 OR "syncCursor" = '')`, - [syncCursor, id], - workspaceId, - transactionManager, - ); - } - - public async resetSyncCursor( - id: string, - workspaceId: string, - transactionManager?: EntityManager, - ) { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageChannel" SET "syncCursor" = '' - WHERE "id" = $1`, - [id], - workspaceId, - transactionManager, - ); - } - - public async incrementThrottleFailureCount( - id: string, - workspaceId: string, - transactionManager?: EntityManager, - ) { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageChannel" SET "throttleFailureCount" = "throttleFailureCount" + 1 - WHERE "id" = $1`, - [id], - workspaceId, - transactionManager, - ); - } - - public async resetThrottleFailureCount( - id: string, - workspaceId: string, - transactionManager?: EntityManager, - ) { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageChannel" SET "throttleFailureCount" = 0 - WHERE "id" = $1`, - [id], - workspaceId, - transactionManager, - ); - } -} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts index 7e6f42d978c3..5f52506883ce 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts @@ -1,4 +1,3 @@ -import { Logger } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; @@ -12,8 +11,7 @@ import { Process } from 'src/engine/integrations/message-queue/decorators/proces import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; -import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; +import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; import { MessageChannelSyncStage, MessageChannelWorkspaceEntity, @@ -25,15 +23,12 @@ import { @Processor(MessageQueue.cronQueue) export class MessagingMessageListFetchCronJob { - private readonly logger = new Logger(MessagingMessageListFetchCronJob.name); - constructor( @InjectRepository(Workspace, 'core') private readonly workspaceRepository: Repository, @InjectMessageQueue(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelRepository: MessageChannelRepository, + private readonly twentyORMGlobalManager: TwentyORMGlobalManager, ) {} @Process(MessagingMessageListFetchCronJob.name) @@ -45,9 +40,14 @@ export class MessagingMessageListFetchCronJob { }); for (const activeWorkspace of activeWorkspaces) { - const messageChannels = await this.messageChannelRepository.getAll( - activeWorkspace.id, - ); + const messageChannelRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + 'messageChannel', + activeWorkspace.id, + ); + const messageChannels = await messageChannelRepository.find({ + select: ['id'], + }); for (const messageChannel of messageChannels) { if ( diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts index e47e49d852d5..c9f316cd6c2b 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts @@ -12,8 +12,7 @@ import { Process } from 'src/engine/integrations/message-queue/decorators/proces import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; -import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; +import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; import { MessageChannelSyncStage, MessageChannelWorkspaceEntity, @@ -32,8 +31,7 @@ export class MessagingMessagesImportCronJob { private readonly workspaceRepository: Repository, @InjectMessageQueue(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelRepository: MessageChannelRepository, + private readonly twentyORMGlobalManager: TwentyORMGlobalManager, ) {} @Process(MessagingMessagesImportCronJob.name) @@ -45,9 +43,14 @@ export class MessagingMessagesImportCronJob { }); for (const activeWorkspace of activeWorkspaces) { - const messageChannels = await this.messageChannelRepository.getAll( - activeWorkspace.id, - ); + const messageChannelRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + 'messageChannel', + activeWorkspace.id, + ); + const messageChannels = await messageChannelRepository.find({ + select: ['id'], + }); for (const messageChannel of messageChannels) { if ( diff --git a/packages/twenty-server/src/modules/messaging/monitoring/crons/jobs/messaging-message-channel-sync-status-monitoring.cron.ts b/packages/twenty-server/src/modules/messaging/monitoring/crons/jobs/messaging-message-channel-sync-status-monitoring.cron.ts index fb7561d20bd0..9d403fa09941 100644 --- a/packages/twenty-server/src/modules/messaging/monitoring/crons/jobs/messaging-message-channel-sync-status-monitoring.cron.ts +++ b/packages/twenty-server/src/modules/messaging/monitoring/crons/jobs/messaging-message-channel-sync-status-monitoring.cron.ts @@ -11,8 +11,7 @@ import { import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; -import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; +import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingTelemetryService } from 'src/modules/messaging/monitoring/services/messaging-telemetry.service'; @@ -25,9 +24,8 @@ export class MessagingMessageChannelSyncStatusMonitoringCronJob { constructor( @InjectRepository(Workspace, 'core') private readonly workspaceRepository: Repository, - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelRepository: MessageChannelRepository, private readonly messagingTelemetryService: MessagingTelemetryService, + private readonly twentyORMGlobalManager: TwentyORMGlobalManager, ) {} @Process(MessagingMessageChannelSyncStatusMonitoringCronJob.name) @@ -46,9 +44,14 @@ export class MessagingMessageChannelSyncStatusMonitoringCronJob { }); for (const activeWorkspace of activeWorkspaces) { - const messageChannels = await this.messageChannelRepository.getAll( - activeWorkspace.id, - ); + const messageChannelRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + 'messageChannel', + activeWorkspace.id, + ); + const messageChannels = await messageChannelRepository.find({ + select: ['id', 'syncStatus', 'connectedAccountId'], + }); for (const messageChannel of messageChannels) { if (!messageChannel.syncStatus) { From ba31768661478af804ead79023a89e9ea29cdd11 Mon Sep 17 00:00:00 2001 From: bosiraphael Date: Wed, 7 Aug 2024 17:03:54 +0200 Subject: [PATCH 10/17] wip: update MessagingChannelSyncStatusService --- .../metadata-to-repository.mapping.ts | 3 - .../listeners/messaging-blocklist.listener.ts | 33 ++- .../can-access-message-thread.service.ts | 4 - .../messaging-channel-sync-status.service.ts | 202 ++++++++++-------- .../jobs/messaging-message-list-fetch.job.ts | 8 +- .../jobs/messaging-messages-import.job.ts | 8 +- .../messaging-error-handling.service.ts | 5 - .../messaging-messages-import.service.ts | 3 - ...ging-partial-message-list-fetch.service.ts | 4 - ...eate-company-and-contact-after-sync.job.ts | 8 +- 10 files changed, 142 insertions(+), 136 deletions(-) diff --git a/packages/twenty-server/src/engine/object-metadata-repository/metadata-to-repository.mapping.ts b/packages/twenty-server/src/engine/object-metadata-repository/metadata-to-repository.mapping.ts index 28fe7f6fd07d..0de091bbadaa 100644 --- a/packages/twenty-server/src/engine/object-metadata-repository/metadata-to-repository.mapping.ts +++ b/packages/twenty-server/src/engine/object-metadata-repository/metadata-to-repository.mapping.ts @@ -1,7 +1,6 @@ import { BlocklistRepository } from 'src/modules/blocklist/repositories/blocklist.repository'; import { CompanyRepository } from 'src/modules/company/repositories/company.repository'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { PersonRepository } from 'src/modules/person/repositories/person.repository'; import { AuditLogRepository } from 'src/modules/timeline/repositiories/audit-log.repository'; import { TimelineActivityRepository } from 'src/modules/timeline/repositiories/timeline-activity.repository'; @@ -12,8 +11,6 @@ export const metadataToRepositoryMapping = { BlocklistWorkspaceEntity: BlocklistRepository, CompanyWorkspaceEntity: CompanyRepository, ConnectedAccountWorkspaceEntity: ConnectedAccountRepository, - - MessageChannelWorkspaceEntity: MessageChannelRepository, PersonWorkspaceEntity: PersonRepository, TimelineActivityWorkspaceEntity: TimelineActivityRepository, WorkspaceMemberWorkspaceEntity: WorkspaceMemberRepository, diff --git a/packages/twenty-server/src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener.ts b/packages/twenty-server/src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener.ts index 4139c1d07aa9..996aca092466 100644 --- a/packages/twenty-server/src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener.ts +++ b/packages/twenty-server/src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener.ts @@ -8,6 +8,7 @@ import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decora import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; @@ -16,6 +17,7 @@ import { BlocklistItemDeleteMessagesJobData, } from 'src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job'; import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; +import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; @Injectable() export class MessagingBlocklistListener { @@ -25,6 +27,7 @@ export class MessagingBlocklistListener { @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) private readonly connectedAccountRepository: ConnectedAccountRepository, private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService, + private readonly twentyORMManager: TwentyORMManager, ) {} @OnEvent('blocklist.created') @@ -57,14 +60,19 @@ export class MessagingBlocklistListener { return; } - const messageChannel = - await this.messageChannelRepository.getByConnectedAccountId( - connectedAccount[0].id, - workspaceId, + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', ); + const messageChannel = await messageChannelRepository.findOneOrFail({ + where: { + connectedAccountId: connectedAccount[0].id, + }, + }); + await this.messagingChannelSyncStatusService.resetAndScheduleFullMessageListFetch( - messageChannel[0].id, + messageChannel.id, workspaceId, ); } @@ -94,14 +102,19 @@ export class MessagingBlocklistListener { return; } - const messageChannel = - await this.messageChannelRepository.getByConnectedAccountId( - connectedAccount[0].id, - workspaceId, + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', ); + const messageChannel = await messageChannelRepository.findOneOrFail({ + where: { + connectedAccountId: connectedAccount[0].id, + }, + }); + await this.messagingChannelSyncStatusService.resetAndScheduleFullMessageListFetch( - messageChannel[0].id, + messageChannel.id, workspaceId, ); } diff --git a/packages/twenty-server/src/modules/messaging/common/query-hooks/message/can-access-message-thread.service.ts b/packages/twenty-server/src/modules/messaging/common/query-hooks/message/can-access-message-thread.service.ts index f79c972c6100..cfeef11c5082 100644 --- a/packages/twenty-server/src/modules/messaging/common/query-hooks/message/can-access-message-thread.service.ts +++ b/packages/twenty-server/src/modules/messaging/common/query-hooks/message/can-access-message-thread.service.ts @@ -5,16 +5,12 @@ import groupBy from 'lodash.groupby'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; -import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { WorkspaceMemberRepository } from 'src/modules/workspace-member/repositories/workspace-member.repository'; import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity'; import { isDefined } from 'src/utils/is-defined'; export class CanAccessMessageThreadService { constructor( - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelService: MessageChannelRepository, @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) private readonly connectedAccountRepository: ConnectedAccountRepository, @InjectObjectMetadataRepository(WorkspaceMemberWorkspaceEntity) diff --git a/packages/twenty-server/src/modules/messaging/common/services/messaging-channel-sync-status.service.ts b/packages/twenty-server/src/modules/messaging/common/services/messaging-channel-sync-status.service.ts index 2b326bc39296..49c52cb8333a 100644 --- a/packages/twenty-server/src/modules/messaging/common/services/messaging-channel-sync-status.service.ts +++ b/packages/twenty-server/src/modules/messaging/common/services/messaging-channel-sync-status.service.ts @@ -3,11 +3,9 @@ import { Injectable } from '@nestjs/common'; import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service'; import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator'; import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; -import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { AccountsToReconnectService } from 'src/modules/connected-account/services/accounts-to-reconnect.service'; import { AccountsToReconnectKeys } from 'src/modules/connected-account/types/accounts-to-reconnect-key-value.type'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { MessageChannelSyncStage, MessageChannelSyncStatus, @@ -17,44 +15,57 @@ import { @Injectable() export class MessagingChannelSyncStatusService { constructor( - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelRepository: MessageChannelRepository, @InjectCacheStorage(CacheStorageNamespace.Messaging) private readonly cacheStorage: CacheStorageService, private readonly twentyORMManager: TwentyORMManager, private readonly accountsToReconnectService: AccountsToReconnectService, ) {} - public async scheduleFullMessageListFetch( - messageChannelId: string, - workspaceId: string, - ) { - await this.messageChannelRepository.updateSyncStage( - messageChannelId, - MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING, - workspaceId, + public async scheduleFullMessageListFetch(messageChannelId: string) { + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + + await messageChannelRepository.update( + { + id: messageChannelId, + }, + { + syncStage: MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING, + }, ); } - public async schedulePartialMessageListFetch( - messageChannelId: string, - workspaceId: string, - ) { - await this.messageChannelRepository.updateSyncStage( - messageChannelId, - MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING, - workspaceId, + public async schedulePartialMessageListFetch(messageChannelId: string) { + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + + await messageChannelRepository.update( + { + id: messageChannelId, + }, + { + syncStage: MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING, + }, ); } - public async scheduleMessagesImport( - messageChannelId: string, - workspaceId: string, - ) { - await this.messageChannelRepository.updateSyncStage( - messageChannelId, - MessageChannelSyncStage.MESSAGES_IMPORT_PENDING, - workspaceId, + public async scheduleMessagesImport(messageChannelId: string) { + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + + await messageChannelRepository.update( + { + id: messageChannelId, + }, + { + syncStage: MessageChannelSyncStage.MESSAGES_IMPORT_PENDING, + }, ); } @@ -66,62 +77,75 @@ export class MessagingChannelSyncStatusService { `messages-to-import:${workspaceId}:gmail:${messageChannelId}`, ); - await this.messageChannelRepository.resetSyncCursor( - messageChannelId, - workspaceId, - ); - - await this.messageChannelRepository.resetSyncStageStartedAt( - messageChannelId, - workspaceId, - ); + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); - await this.messageChannelRepository.resetThrottleFailureCount( - messageChannelId, - workspaceId, + await messageChannelRepository.update( + { + id: messageChannelId, + }, + { + syncCursor: '', + syncStageStartedAt: null, + throttleFailureCount: 0, + }, ); - await this.scheduleFullMessageListFetch(messageChannelId, workspaceId); + await this.scheduleFullMessageListFetch(messageChannelId); } - public async markAsMessagesListFetchOngoing( - messageChannelId: string, - workspaceId: string, - ) { - await this.messageChannelRepository.updateSyncStage( - messageChannelId, - MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING, - workspaceId, - ); + public async markAsMessagesListFetchOngoing(messageChannelId: string) { + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); - await this.messageChannelRepository.updateSyncStatus( - messageChannelId, - MessageChannelSyncStatus.ONGOING, - workspaceId, + await messageChannelRepository.update( + { + id: messageChannelId, + }, + { + syncStage: MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING, + syncStatus: MessageChannelSyncStatus.ONGOING, + }, ); } public async markAsCompletedAndSchedulePartialMessageListFetch( messageChannelId: string, - workspaceId: string, ) { - await this.messageChannelRepository.updateSyncStatus( - messageChannelId, - MessageChannelSyncStatus.ACTIVE, - workspaceId, + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + + await messageChannelRepository.update( + { + id: messageChannelId, + }, + { + syncStatus: MessageChannelSyncStatus.ACTIVE, + }, ); - await this.schedulePartialMessageListFetch(messageChannelId, workspaceId); + await this.schedulePartialMessageListFetch(messageChannelId); } - public async markAsMessagesImportOngoing( - messageChannelId: string, - workspaceId: string, - ) { - await this.messageChannelRepository.updateSyncStage( - messageChannelId, - MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING, - workspaceId, + public async markAsMessagesImportOngoing(messageChannelId: string) { + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + + await messageChannelRepository.update( + { + id: messageChannelId, + }, + { + syncStage: MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING, + }, ); } @@ -133,16 +157,19 @@ export class MessagingChannelSyncStatusService { `messages-to-import:${workspaceId}:gmail:${messageChannelId}`, ); - await this.messageChannelRepository.updateSyncStage( - messageChannelId, - MessageChannelSyncStage.FAILED, - workspaceId, - ); + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); - await this.messageChannelRepository.updateSyncStatus( - messageChannelId, - MessageChannelSyncStatus.FAILED_UNKNOWN, - workspaceId, + await messageChannelRepository.update( + { + id: messageChannelId, + }, + { + syncStage: MessageChannelSyncStage.FAILED, + syncStatus: MessageChannelSyncStatus.FAILED_UNKNOWN, + }, ); } @@ -154,16 +181,19 @@ export class MessagingChannelSyncStatusService { `messages-to-import:${workspaceId}:gmail:${messageChannelId}`, ); - await this.messageChannelRepository.updateSyncStage( - messageChannelId, - MessageChannelSyncStage.FAILED, - workspaceId, - ); + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); - await this.messageChannelRepository.updateSyncStatus( - messageChannelId, - MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS, - workspaceId, + await messageChannelRepository.update( + { + id: messageChannelId, + }, + { + syncStage: MessageChannelSyncStage.FAILED, + syncStatus: MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS, + }, ); await this.addToAccountsToReconnect(messageChannelId, workspaceId); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts index 98dd34d4994f..04ff2bdbcd40 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts @@ -7,11 +7,7 @@ import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repos import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { isThrottled } from 'src/modules/connected-account/utils/is-throttled'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; -import { - MessageChannelSyncStage, - MessageChannelWorkspaceEntity, -} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { MessageChannelSyncStage } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingFullMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service'; import { MessagingPartialMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service'; import { MessagingTelemetryService } from 'src/modules/messaging/monitoring/services/messaging-telemetry.service'; @@ -33,8 +29,6 @@ export class MessagingMessageListFetchJob { private readonly messagingPartialMessageListFetchService: MessagingPartialMessageListFetchService, @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) private readonly connectedAccountRepository: ConnectedAccountRepository, - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelRepository: MessageChannelRepository, private readonly messagingTelemetryService: MessagingTelemetryService, ) {} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts index becfa6d548c8..17ff2094f04e 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts @@ -7,11 +7,7 @@ import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repos import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { isThrottled } from 'src/modules/connected-account/utils/is-throttled'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; -import { - MessageChannelSyncStage, - MessageChannelWorkspaceEntity, -} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { MessageChannelSyncStage } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingMessagesImportService } from 'src/modules/messaging/message-import-manager/services/messaging-messages-import.service'; import { MessagingTelemetryService } from 'src/modules/messaging/monitoring/services/messaging-telemetry.service'; @@ -29,8 +25,6 @@ export class MessagingMessagesImportJob { @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) private readonly connectedAccountRepository: ConnectedAccountRepository, private readonly gmailFetchMessageContentFromCacheService: MessagingMessagesImportService, - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelRepository: MessageChannelRepository, private readonly messagingTelemetryService: MessagingTelemetryService, ) {} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-error-handling.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-error-handling.service.ts index 52585e3736a1..3b01650966ad 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-error-handling.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-error-handling.service.ts @@ -5,7 +5,6 @@ import snakeCase from 'lodash.snakecase'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MESSAGING_THROTTLE_MAX_ATTEMPTS } from 'src/modules/messaging/message-import-manager/constants/messaging-throttle-max-attempts'; @@ -28,8 +27,6 @@ export class MessagingErrorHandlingService { private readonly connectedAccountRepository: ConnectedAccountRepository, private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService, private readonly messagingTelemetryService: MessagingTelemetryService, - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelRepository: MessageChannelRepository, ) {} public async handleGmailError( @@ -263,14 +260,12 @@ export class MessagingErrorHandlingService { case 'full-message-list-fetch': await this.messagingChannelSyncStatusService.scheduleFullMessageListFetch( messageChannel.id, - workspaceId, ); break; case 'partial-message-list-fetch': await this.messagingChannelSyncStatusService.schedulePartialMessageListFetch( messageChannel.id, - workspaceId, ); break; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts index d39b507545ef..031ef59324c9 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts @@ -12,7 +12,6 @@ import { EmailAliasManagerService } from 'src/modules/connected-account/email-al import { RefreshAccessTokenService } from 'src/modules/connected-account/refresh-access-token-manager/services/refresh-access-token.service'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; import { MessageChannelSyncStage, @@ -40,8 +39,6 @@ export class MessagingMessagesImportService { private readonly messagingTelemetryService: MessagingTelemetryService, @InjectObjectMetadataRepository(BlocklistWorkspaceEntity) private readonly blocklistRepository: BlocklistRepository, - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelRepository: MessageChannelRepository, private readonly emailAliasManagerService: EmailAliasManagerService, private readonly isFeatureEnabledService: IsFeatureEnabledService, @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts index a186f7615ed2..4703a15cbab4 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts @@ -6,10 +6,8 @@ import { Any } from 'typeorm'; import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service'; import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator'; import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; -import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; @@ -26,8 +24,6 @@ export class MessagingPartialMessageListFetchService { constructor( private readonly gmailClientProvider: MessagingGmailClientProvider, - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelRepository: MessageChannelRepository, @InjectCacheStorage(CacheStorageNamespace.Messaging) private readonly cacheStorage: CacheStorageService, private readonly gmailErrorHandlingService: MessagingErrorHandlingService, diff --git a/packages/twenty-server/src/modules/messaging/message-participant-manager/jobs/messaging-create-company-and-contact-after-sync.job.ts b/packages/twenty-server/src/modules/messaging/message-participant-manager/jobs/messaging-create-company-and-contact-after-sync.job.ts index 06cd7321cb8d..ce336e5520be 100644 --- a/packages/twenty-server/src/modules/messaging/message-participant-manager/jobs/messaging-create-company-and-contact-after-sync.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-participant-manager/jobs/messaging-create-company-and-contact-after-sync.job.ts @@ -10,11 +10,7 @@ import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { CreateCompanyAndContactService } from 'src/modules/contact-creation-manager/services/create-company-and-contact.service'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; -import { - MessageChannelContactAutoCreationPolicy, - MessageChannelWorkspaceEntity, -} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { MessageChannelContactAutoCreationPolicy } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessageParticipantWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-participant.workspace-entity'; export type MessagingCreateCompanyAndContactAfterSyncJobData = { @@ -29,8 +25,6 @@ export class MessagingCreateCompanyAndContactAfterSyncJob { ); constructor( private readonly createCompanyAndContactService: CreateCompanyAndContactService, - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelService: MessageChannelRepository, @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) private readonly connectedAccountRepository: ConnectedAccountRepository, private readonly twentyORMManager: TwentyORMManager, From 13d1dc0b88c8221a8ca5cea4ec34e46e2d07d030 Mon Sep 17 00:00:00 2001 From: bosiraphael Date: Wed, 7 Aug 2024 17:10:50 +0200 Subject: [PATCH 11/17] update CanAccessMessageThreadService --- .../can-access-message-thread.service.ts | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/packages/twenty-server/src/modules/messaging/common/query-hooks/message/can-access-message-thread.service.ts b/packages/twenty-server/src/modules/messaging/common/query-hooks/message/can-access-message-thread.service.ts index cfeef11c5082..44141f3dde92 100644 --- a/packages/twenty-server/src/modules/messaging/common/query-hooks/message/can-access-message-thread.service.ts +++ b/packages/twenty-server/src/modules/messaging/common/query-hooks/message/can-access-message-thread.service.ts @@ -1,10 +1,13 @@ import { ForbiddenException } from '@nestjs/common'; import groupBy from 'lodash.groupby'; +import { Any } from 'typeorm'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; +import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { WorkspaceMemberRepository } from 'src/modules/workspace-member/repositories/workspace-member.repository'; import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity'; import { isDefined } from 'src/utils/is-defined'; @@ -15,6 +18,7 @@ export class CanAccessMessageThreadService { private readonly connectedAccountRepository: ConnectedAccountRepository, @InjectObjectMetadataRepository(WorkspaceMemberWorkspaceEntity) private readonly workspaceMemberRepository: WorkspaceMemberRepository, + private readonly twentyORMManager: TwentyORMManager, ) {} public async canAccessMessageThread( @@ -22,12 +26,19 @@ export class CanAccessMessageThreadService { workspaceId: string, messageChannelMessageAssociations: any[], ) { - const messageChannels = await this.messageChannelService.getByIds( - messageChannelMessageAssociations.map( - (association) => association.messageChannelId, - ), - workspaceId, - ); + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + const messageChannels = await messageChannelRepository.find({ + where: { + id: Any( + messageChannelMessageAssociations.map( + (association) => association.messageChannelId, + ), + ), + }, + }); const messageChannelsGroupByVisibility = groupBy( messageChannels, From a3389c82952d6d3b04a1e12e13807a1ab215c923 Mon Sep 17 00:00:00 2001 From: bosiraphael Date: Wed, 7 Aug 2024 17:13:07 +0200 Subject: [PATCH 12/17] update jobs --- .../jobs/messaging-message-list-fetch.job.ts | 21 ++++++++++++++----- .../jobs/messaging-messages-import.job.ts | 21 ++++++++++++++----- 2 files changed, 32 insertions(+), 10 deletions(-) diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts index 04ff2bdbcd40..09854fbf4f3a 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts @@ -4,10 +4,14 @@ import { Process } from 'src/engine/integrations/message-queue/decorators/proces import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { isThrottled } from 'src/modules/connected-account/utils/is-throttled'; -import { MessageChannelSyncStage } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { + MessageChannelSyncStage, + MessageChannelWorkspaceEntity, +} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingFullMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service'; import { MessagingPartialMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service'; import { MessagingTelemetryService } from 'src/modules/messaging/monitoring/services/messaging-telemetry.service'; @@ -30,6 +34,7 @@ export class MessagingMessageListFetchJob { @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) private readonly connectedAccountRepository: ConnectedAccountRepository, private readonly messagingTelemetryService: MessagingTelemetryService, + private readonly twentyORMManager: TwentyORMManager, ) {} @Process(MessagingMessageListFetchJob.name) @@ -42,10 +47,16 @@ export class MessagingMessageListFetchJob { workspaceId, }); - const messageChannel = await this.messageChannelRepository.getById( - messageChannelId, - workspaceId, - ); + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + + const messageChannel = await messageChannelRepository.findOne({ + where: { + id: messageChannelId, + }, + }); if (!messageChannel) { await this.messagingTelemetryService.track({ diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts index 17ff2094f04e..935be741e363 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts @@ -4,10 +4,14 @@ import { Process } from 'src/engine/integrations/message-queue/decorators/proces import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { isThrottled } from 'src/modules/connected-account/utils/is-throttled'; -import { MessageChannelSyncStage } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { + MessageChannelSyncStage, + MessageChannelWorkspaceEntity, +} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingMessagesImportService } from 'src/modules/messaging/message-import-manager/services/messaging-messages-import.service'; import { MessagingTelemetryService } from 'src/modules/messaging/monitoring/services/messaging-telemetry.service'; @@ -26,6 +30,7 @@ export class MessagingMessagesImportJob { private readonly connectedAccountRepository: ConnectedAccountRepository, private readonly gmailFetchMessageContentFromCacheService: MessagingMessagesImportService, private readonly messagingTelemetryService: MessagingTelemetryService, + private readonly twentyORMManager: TwentyORMManager, ) {} @Process(MessagingMessagesImportJob.name) @@ -38,10 +43,16 @@ export class MessagingMessagesImportJob { messageChannelId, }); - const messageChannel = await this.messageChannelRepository.getById( - messageChannelId, - workspaceId, - ); + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + + const messageChannel = await messageChannelRepository.findOne({ + where: { + id: messageChannelId, + }, + }); if (!messageChannel) { await this.messagingTelemetryService.track({ From 52b9558f524c032b4766deedfd497a1aef1b5df8 Mon Sep 17 00:00:00 2001 From: bosiraphael Date: Wed, 7 Aug 2024 17:42:54 +0200 Subject: [PATCH 13/17] replace all calls to messageChannelRepository --- .../engine/core-modules/auth/auth.module.ts | 2 - .../messaging-query-hook.module.ts | 2 - .../gmail/messaging-gmail-driver.module.ts | 2 - .../jobs/messaging-ongoing-stale.job.ts | 2 - .../messaging-error-handling.service.ts | 17 ++++-- ...ssaging-full-message-list-fetch.service.ts | 54 +++++++++++++------ .../messaging-messages-import.service.ts | 25 +++++---- ...ging-partial-message-list-fetch.service.ts | 39 ++++++++------ ...eate-company-and-contact-after-sync.job.ts | 21 +++++--- 9 files changed, 103 insertions(+), 61 deletions(-) diff --git a/packages/twenty-server/src/engine/core-modules/auth/auth.module.ts b/packages/twenty-server/src/engine/core-modules/auth/auth.module.ts index 064cb7aad123..62ca6a411053 100644 --- a/packages/twenty-server/src/engine/core-modules/auth/auth.module.ts +++ b/packages/twenty-server/src/engine/core-modules/auth/auth.module.ts @@ -30,7 +30,6 @@ import { WorkspaceManagerModule } from 'src/engine/workspace-manager/workspace-m import { CalendarChannelWorkspaceEntity } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity'; import { ConnectedAccountModule } from 'src/modules/connected-account/connected-account.module'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; -import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { AuthResolver } from './auth.resolver'; @@ -51,7 +50,6 @@ import { JwtAuthStrategy } from './strategies/jwt.auth.strategy'; ), ObjectMetadataRepositoryModule.forFeature([ ConnectedAccountWorkspaceEntity, - MessageChannelWorkspaceEntity, ]), HttpModule, UserWorkspaceModule, diff --git a/packages/twenty-server/src/modules/messaging/common/query-hooks/messaging-query-hook.module.ts b/packages/twenty-server/src/modules/messaging/common/query-hooks/messaging-query-hook.module.ts index a6e369c9bc3b..9c84640768af 100644 --- a/packages/twenty-server/src/modules/messaging/common/query-hooks/messaging-query-hook.module.ts +++ b/packages/twenty-server/src/modules/messaging/common/query-hooks/messaging-query-hook.module.ts @@ -5,13 +5,11 @@ import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/s import { CanAccessMessageThreadService } from 'src/modules/messaging/common/query-hooks/message/can-access-message-thread.service'; import { MessageFindManyPreQueryHook } from 'src/modules/messaging/common/query-hooks/message/message-find-many.pre-query.hook'; import { MessageFindOnePreQueryHook } from 'src/modules/messaging/common/query-hooks/message/message-find-one.pre-query-hook'; -import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity'; @Module({ imports: [ ObjectMetadataRepositoryModule.forFeature([ - MessageChannelWorkspaceEntity, ConnectedAccountWorkspaceEntity, WorkspaceMemberWorkspaceEntity, ]), diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/messaging-gmail-driver.module.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/messaging-gmail-driver.module.ts index 4c4270f91685..ac5bf9ea62ba 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/messaging-gmail-driver.module.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/messaging-gmail-driver.module.ts @@ -12,7 +12,6 @@ import { EmailAliasManagerModule } from 'src/modules/connected-account/email-ali import { OAuth2ClientManagerModule } from 'src/modules/connected-account/oauth2-client-manager/oauth2-client-manager.module'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { MessagingCommonModule } from 'src/modules/messaging/common/messaging-common.module'; -import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingGmailClientProvider } from 'src/modules/messaging/message-import-manager/drivers/gmail/providers/messaging-gmail-client.provider'; import { MessagingGmailFetchByBatchesService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-by-batch.service'; import { MessagingGmailFetchMessagesByBatchesService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-by-batches.service'; @@ -28,7 +27,6 @@ import { MessageParticipantManagerModule } from 'src/modules/messaging/message-p EnvironmentModule, ObjectMetadataRepositoryModule.forFeature([ ConnectedAccountWorkspaceEntity, - MessageChannelWorkspaceEntity, BlocklistWorkspaceEntity, ]), MessagingCommonModule, diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts index 45e2b4ec264c..a2839cdd92c0 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts @@ -59,13 +59,11 @@ export class MessagingOngoingStaleJob { case MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING: await this.messagingChannelSyncStatusService.schedulePartialMessageListFetch( messageChannel.id, - workspaceId, ); break; case MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING: await this.messagingChannelSyncStatusService.scheduleMessagesImport( messageChannel.id, - workspaceId, ); break; default: diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-error-handling.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-error-handling.service.ts index 3b01650966ad..2a6570961ce6 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-error-handling.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-error-handling.service.ts @@ -3,6 +3,7 @@ import { Injectable } from '@nestjs/common'; import snakeCase from 'lodash.snakecase'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; @@ -27,6 +28,7 @@ export class MessagingErrorHandlingService { private readonly connectedAccountRepository: ConnectedAccountRepository, private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService, private readonly messagingTelemetryService: MessagingTelemetryService, + private readonly twentyORMManager: TwentyORMManager, ) {} public async handleGmailError( @@ -272,7 +274,6 @@ export class MessagingErrorHandlingService { case 'messages-import': await this.messagingChannelSyncStatusService.scheduleMessagesImport( messageChannel.id, - workspaceId, ); break; @@ -285,9 +286,17 @@ export class MessagingErrorHandlingService { messageChannel: MessageChannelWorkspaceEntity, workspaceId: string, ): Promise { - await this.messageChannelRepository.incrementThrottleFailureCount( - messageChannel.id, - workspaceId, + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + + await messageChannelRepository.increment( + { + id: messageChannel.id, + }, + 'throttleFailureCount', + 1, ); await this.messagingTelemetryService.track({ diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts index bda33157baaa..dde37698e9c8 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts @@ -7,10 +7,8 @@ import { Any, EntityManager } from 'typeorm'; import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service'; import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator'; import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; -import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; -import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; @@ -31,8 +29,6 @@ export class MessagingFullMessageListFetchService { constructor( private readonly gmailClientProvider: MessagingGmailClientProvider, - @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) - private readonly messageChannelRepository: MessageChannelRepository, @InjectCacheStorage(CacheStorageNamespace.Messaging) private readonly cacheStorage: CacheStorageService, private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService, @@ -47,7 +43,6 @@ export class MessagingFullMessageListFetchService { ) { await this.messagingChannelSyncStatusService.markAsMessagesListFetchOngoing( messageChannel.id, - workspaceId, ); const gmailClient: gmail_v1.Gmail = @@ -70,19 +65,23 @@ export class MessagingFullMessageListFetchService { return; } - await this.messageChannelRepository.resetThrottleFailureCount( - messageChannel.id, - workspaceId, - ); + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); - await this.messageChannelRepository.resetSyncStageStartedAt( - messageChannel.id, - workspaceId, + await messageChannelRepository.update( + { + id: messageChannel.id, + }, + { + throttleFailureCount: 0, + syncStageStartedAt: null, + }, ); await this.messagingChannelSyncStatusService.scheduleMessagesImport( messageChannel.id, - workspaceId, ); } @@ -216,11 +215,32 @@ export class MessagingFullMessageListFetchService { ); } - await this.messageChannelRepository.updateLastSyncCursorIfHigher( - messageChannelId, - historyId, - workspaceId, + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + + const messageChannel = await messageChannelRepository.findOneOrFail( + { + where: { + id: messageChannelId, + }, + }, transactionManager, ); + + const currentSyncCursor = messageChannel.syncCursor; + + if (!currentSyncCursor || historyId > currentSyncCursor) { + await messageChannelRepository.update( + { + id: messageChannel.id, + }, + { + syncCursor: historyId, + }, + transactionManager, + ); + } } } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts index 031ef59324c9..89fcedb6d874 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts @@ -6,6 +6,7 @@ import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator'; import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { BlocklistRepository } from 'src/modules/blocklist/repositories/blocklist.repository'; import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity'; import { EmailAliasManagerService } from 'src/modules/connected-account/email-alias-manager/services/email-alias-manager.service'; @@ -43,6 +44,7 @@ export class MessagingMessagesImportService { private readonly isFeatureEnabledService: IsFeatureEnabledService, @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) private readonly connectedAccountRepository: ConnectedAccountRepository, + private readonly twentyORMManager: TwentyORMManager, ) {} async processMessageBatchImport( @@ -70,7 +72,6 @@ export class MessagingMessagesImportService { await this.messagingChannelSyncStatusService.markAsMessagesImportOngoing( messageChannel.id, - workspaceId, ); let accessToken: string; @@ -136,7 +137,6 @@ export class MessagingMessagesImportService { if (!messageIdsToFetch?.length) { await this.messagingChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch( messageChannel.id, - workspaceId, ); return await this.trackMessageImportCompleted( @@ -177,23 +177,26 @@ export class MessagingMessagesImportService { ) { await this.messagingChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch( messageChannel.id, - workspaceId, ); } else { await this.messagingChannelSyncStatusService.scheduleMessagesImport( messageChannel.id, - workspaceId, ); } - await this.messageChannelRepository.resetThrottleFailureCount( - messageChannel.id, - workspaceId, - ); + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); - await this.messageChannelRepository.resetSyncStageStartedAt( - messageChannel.id, - workspaceId, + await messageChannelRepository.update( + { + id: messageChannel.id, + }, + { + throttleFailureCount: 0, + syncStageStartedAt: null, + }, ); return await this.trackMessageImportCompleted( diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts index 4703a15cbab4..7ab5f5c65250 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts @@ -40,7 +40,6 @@ export class MessagingPartialMessageListFetchService { ): Promise { await this.messagingChannelSyncStatusService.markAsMessagesListFetchOngoing( messageChannel.id, - workspaceId, ); const lastSyncHistoryId = messageChannel.syncCursor; @@ -65,14 +64,19 @@ export class MessagingPartialMessageListFetchService { return; } - await this.messageChannelRepository.resetThrottleFailureCount( - messageChannel.id, - workspaceId, - ); + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); - await this.messageChannelRepository.resetSyncStageStartedAt( - messageChannel.id, - workspaceId, + await messageChannelRepository.update( + { + id: messageChannel.id, + }, + { + throttleFailureCount: 0, + syncStageStartedAt: null, + }, ); if (!historyId) { @@ -88,7 +92,6 @@ export class MessagingPartialMessageListFetchService { await this.messagingChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch( messageChannel.id, - workspaceId, ); return; @@ -143,15 +146,21 @@ export class MessagingPartialMessageListFetchService { `Deleted ${messagesDeleted.length} messages for workspace ${workspaceId} and account ${connectedAccount.id}`, ); - await this.messageChannelRepository.updateLastSyncCursorIfHigher( - messageChannel.id, - historyId, - workspaceId, - ); + const currentSyncCursor = messageChannel.syncCursor; + + if (!currentSyncCursor || historyId > currentSyncCursor) { + await messageChannelRepository.update( + { + id: messageChannel.id, + }, + { + syncCursor: historyId, + }, + ); + } await this.messagingChannelSyncStatusService.scheduleMessagesImport( messageChannel.id, - workspaceId, ); } } diff --git a/packages/twenty-server/src/modules/messaging/message-participant-manager/jobs/messaging-create-company-and-contact-after-sync.job.ts b/packages/twenty-server/src/modules/messaging/message-participant-manager/jobs/messaging-create-company-and-contact-after-sync.job.ts index ce336e5520be..366929372a44 100644 --- a/packages/twenty-server/src/modules/messaging/message-participant-manager/jobs/messaging-create-company-and-contact-after-sync.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-participant-manager/jobs/messaging-create-company-and-contact-after-sync.job.ts @@ -10,7 +10,10 @@ import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { CreateCompanyAndContactService } from 'src/modules/contact-creation-manager/services/create-company-and-contact.service'; -import { MessageChannelContactAutoCreationPolicy } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { + MessageChannelContactAutoCreationPolicy, + MessageChannelWorkspaceEntity, +} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessageParticipantWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-participant.workspace-entity'; export type MessagingCreateCompanyAndContactAfterSyncJobData = { @@ -39,12 +42,18 @@ export class MessagingCreateCompanyAndContactAfterSyncJob { ); const { workspaceId, messageChannelId } = data; - const messageChannel = await this.messageChannelService.getByIds( - [messageChannelId], - workspaceId, - ); + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + + const messageChannel = await messageChannelRepository.findOneOrFail({ + where: { + id: messageChannelId, + }, + }); - const { contactAutoCreationPolicy, connectedAccountId } = messageChannel[0]; + const { contactAutoCreationPolicy, connectedAccountId } = messageChannel; if ( contactAutoCreationPolicy === MessageChannelContactAutoCreationPolicy.NONE From fa57b100f6ebb2e6dc12738a9e730900635f25fc Mon Sep 17 00:00:00 2001 From: bosiraphael Date: Wed, 7 Aug 2024 17:54:50 +0200 Subject: [PATCH 14/17] fix arguments in wrong order --- .../crons/jobs/messaging-messages-import.cron.job.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts index c9f316cd6c2b..f1dcfd074e4b 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts @@ -45,8 +45,8 @@ export class MessagingMessagesImportCronJob { for (const activeWorkspace of activeWorkspaces) { const messageChannelRepository = await this.twentyORMGlobalManager.getRepositoryForWorkspace( - 'messageChannel', activeWorkspace.id, + 'messageChannel', ); const messageChannels = await messageChannelRepository.find({ select: ['id'], From eaa89f225af7b20caac23b5bb5c834014cc51576 Mon Sep 17 00:00:00 2001 From: bosiraphael Date: Wed, 7 Aug 2024 18:11:15 +0200 Subject: [PATCH 15/17] fix --- .../crons/jobs/messaging-message-list-fetch.cron.job.ts | 3 ++- .../crons/jobs/messaging-messages-import.cron.job.ts | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts index 5f52506883ce..0eb06acc5ec1 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts @@ -42,9 +42,10 @@ export class MessagingMessageListFetchCronJob { for (const activeWorkspace of activeWorkspaces) { const messageChannelRepository = await this.twentyORMGlobalManager.getRepositoryForWorkspace( - 'messageChannel', activeWorkspace.id, + 'messageChannel', ); + const messageChannels = await messageChannelRepository.find({ select: ['id'], }); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts index f1dcfd074e4b..e3da8b05bbc9 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts @@ -48,8 +48,9 @@ export class MessagingMessagesImportCronJob { activeWorkspace.id, 'messageChannel', ); + const messageChannels = await messageChannelRepository.find({ - select: ['id'], + select: ['id', 'isSyncEnabled', 'syncStage'], }); for (const messageChannel of messageChannels) { From a1c4868a1463b11075db6d393ab9f2933fdeb562 Mon Sep 17 00:00:00 2001 From: bosiraphael Date: Wed, 7 Aug 2024 18:33:19 +0200 Subject: [PATCH 16/17] merge main --- .../src/modules/messaging/common/messaging-common.module.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/packages/twenty-server/src/modules/messaging/common/messaging-common.module.ts b/packages/twenty-server/src/modules/messaging/common/messaging-common.module.ts index 4a2d1a867099..8f570152438a 100644 --- a/packages/twenty-server/src/modules/messaging/common/messaging-common.module.ts +++ b/packages/twenty-server/src/modules/messaging/common/messaging-common.module.ts @@ -2,16 +2,13 @@ import { Module } from '@nestjs/common'; import { TypeOrmModule } from '@nestjs/typeorm'; import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity'; -import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module'; import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module'; import { ConnectedAccountModule } from 'src/modules/connected-account/connected-account.module'; import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; -import { PersonWorkspaceEntity } from 'src/modules/person/standard-objects/person.workspace-entity'; @Module({ imports: [ WorkspaceDataSourceModule, - ObjectMetadataRepositoryModule.forFeature([PersonWorkspaceEntity]), TypeOrmModule.forFeature([FeatureFlagEntity], 'core'), ConnectedAccountModule, ], From f28b3b056d1a0f1cfa55c7821330e63ba2766ae6 Mon Sep 17 00:00:00 2001 From: bosiraphael Date: Tue, 13 Aug 2024 18:31:27 +0200 Subject: [PATCH 17/17] fix linking company to person in create-company-and-contact.service --- .../contact-creation-manager/services/create-company.service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/twenty-server/src/modules/contact-creation-manager/services/create-company.service.ts b/packages/twenty-server/src/modules/contact-creation-manager/services/create-company.service.ts index 6e9f3c8956c8..bc3c71c9aae1 100644 --- a/packages/twenty-server/src/modules/contact-creation-manager/services/create-company.service.ts +++ b/packages/twenty-server/src/modules/contact-creation-manager/services/create-company.service.ts @@ -157,7 +157,7 @@ export class CreateCompanyService { }; } - private async createCompanyMap(companies: CompanyWorkspaceEntity[]) { + private createCompanyMap(companies: CompanyWorkspaceEntity[]) { return companies.reduce( (acc, company) => { if (!company.domainName) {