diff --git a/packages/twenty-docker/.env.example b/packages/twenty-docker/.env.example
index 3999fc484cef..d7ab15672a9b 100644
--- a/packages/twenty-docker/.env.example
+++ b/packages/twenty-docker/.env.example
@@ -21,3 +21,5 @@ STORAGE_TYPE=local
# STORAGE_S3_REGION=eu-west3
# STORAGE_S3_NAME=my-bucket
# STORAGE_S3_ENDPOINT=
+
+MESSAGE_QUEUE_TYPE=pg-boss
diff --git a/packages/twenty-docker/docker-compose.yml b/packages/twenty-docker/docker-compose.yml
index 70ef4ceef94e..1ca289c9af5d 100644
--- a/packages/twenty-docker/docker-compose.yml
+++ b/packages/twenty-docker/docker-compose.yml
@@ -13,6 +13,7 @@ services:
PG_DATABASE_URL: postgres://twenty:twenty@${PG_DATABASE_HOST}/default
SERVER_URL: ${SERVER_URL}
FRONT_BASE_URL: ${FRONT_BASE_URL:-$SERVER_URL}
+ MESSAGE_QUEUE_TYPE: ${MESSAGE_QUEUE_TYPE}
ENABLE_DB_MIGRATIONS: "true"
@@ -35,6 +36,32 @@ services:
retries: 10
restart: always
+ worker:
+ image: twentycrm/twenty:${TAG}
+ volumes:
+ - worker-local-data:/app/${STORAGE_LOCAL_PATH:-.local-storage}
+ command: ["yarn", "worker:prod"]
+ environment:
+ PG_DATABASE_URL: postgres://twenty:twenty@${PG_DATABASE_HOST}/default
+ SERVER_URL: ${SERVER_URL}
+ FRONT_BASE_URL: ${FRONT_BASE_URL:-$SERVER_URL}
+ MESSAGE_QUEUE_TYPE: ${MESSAGE_QUEUE_TYPE}
+
+ ENABLE_DB_MIGRATIONS: "true"
+
+ STORAGE_TYPE: ${STORAGE_TYPE}
+ STORAGE_S3_REGION: ${STORAGE_S3_REGION}
+ STORAGE_S3_NAME: ${STORAGE_S3_NAME}
+ STORAGE_S3_ENDPOINT: ${STORAGE_S3_ENDPOINT}
+ ACCESS_TOKEN_SECRET: ${ACCESS_TOKEN_SECRET}
+ LOGIN_TOKEN_SECRET: ${LOGIN_TOKEN_SECRET}
+ REFRESH_TOKEN_SECRET: ${REFRESH_TOKEN_SECRET}
+ FILE_TOKEN_SECRET: ${FILE_TOKEN_SECRET}
+ depends_on:
+ db:
+ condition: service_healthy
+ restart: always
+
db:
image: twentycrm/twenty-postgres:${TAG}
volumes:
@@ -51,3 +78,4 @@ services:
volumes:
db-data:
server-local-data:
+ worker-local-data:
diff --git a/packages/twenty-docs/docs/start/self-hosting/self-hosting.mdx b/packages/twenty-docs/docs/start/self-hosting/self-hosting.mdx
index 3c075174254d..a6ea99e0d010 100644
--- a/packages/twenty-docs/docs/start/self-hosting/self-hosting.mdx
+++ b/packages/twenty-docs/docs/start/self-hosting/self-hosting.mdx
@@ -17,6 +17,15 @@ import TabItem from '@theme/TabItem';
+# Setup Messaging & Calendar sync
+
+Twenty offers integrations with Gmail and Google Calendar. To enable these features, you need to connect to register the following recurring jobs:
+```
+# from your worker container
+yarn command:prod cron:messaging:gmail-messages-import
+yarn command:prod cron:messaging:gmail-message-list-fetch
+```
+
# Setup Environment Variables
## Frontend
@@ -209,3 +218,4 @@ import TabItem from '@theme/TabItem';
['CAPTCHA_SITE_KEY', '', 'The captcha site key'],
['CAPTCHA_SECRET_KEY', '', 'The captcha secret key'],
]}>
+
diff --git a/packages/twenty-server/src/database/typeorm-seeds/workspace/message-channels.ts b/packages/twenty-server/src/database/typeorm-seeds/workspace/message-channels.ts
index 70bc1a638a81..a44890197ff9 100644
--- a/packages/twenty-server/src/database/typeorm-seeds/workspace/message-channels.ts
+++ b/packages/twenty-server/src/database/typeorm-seeds/workspace/message-channels.ts
@@ -43,7 +43,7 @@ export const seedMessageChannel = async (
handle: 'tim@apple.dev',
visibility: 'share_everything',
syncSubStatus:
- MessageChannelSyncSubStatus.FULL_MESSAGES_LIST_FETCH_PENDING,
+ MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING,
},
{
id: DEV_SEED_MESSAGE_CHANNEL_IDS.JONY,
@@ -56,7 +56,7 @@ export const seedMessageChannel = async (
handle: 'jony.ive@apple.dev',
visibility: 'share_everything',
syncSubStatus:
- MessageChannelSyncSubStatus.FULL_MESSAGES_LIST_FETCH_PENDING,
+ MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING,
},
{
id: DEV_SEED_MESSAGE_CHANNEL_IDS.PHIL,
@@ -69,7 +69,7 @@ export const seedMessageChannel = async (
handle: 'phil.schiler@apple.dev',
visibility: 'share_everything',
syncSubStatus:
- MessageChannelSyncSubStatus.FULL_MESSAGES_LIST_FETCH_PENDING,
+ MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING,
},
])
.execute();
diff --git a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/telemetry.listener.ts b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/telemetry.listener.ts
index 1a28c27bbfe1..6fa2751ecfbb 100644
--- a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/telemetry.listener.ts
+++ b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/telemetry.listener.ts
@@ -2,7 +2,6 @@ import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { AnalyticsService } from 'src/engine/core-modules/analytics/analytics.service';
-import { CreateAnalyticsInput } from 'src/engine/core-modules/analytics/dto/create-analytics.input';
import { ObjectRecordCreateEvent } from 'src/engine/integrations/event-emitter/types/object-record-create.event';
@Injectable()
@@ -11,14 +10,13 @@ export class TelemetryListener {
@OnEvent('*.created')
async handleAllCreate(payload: ObjectRecordCreateEvent) {
- this.analyticsService.create(
+ await this.analyticsService.create(
{
type: 'track',
- name: payload.name,
- data: JSON.parse(`{
- "eventName": "${payload.name}"
- }`),
- } as CreateAnalyticsInput,
+ data: {
+ eventName: payload.name,
+ },
+ },
payload.userId,
payload.workspaceId,
'', // voluntarely not retrieving this
diff --git a/packages/twenty-server/src/engine/core-modules/analytics/analytics.resolver.ts b/packages/twenty-server/src/engine/core-modules/analytics/analytics.resolver.ts
index 7f3e710fcfab..d0c229f27346 100644
--- a/packages/twenty-server/src/engine/core-modules/analytics/analytics.resolver.ts
+++ b/packages/twenty-server/src/engine/core-modules/analytics/analytics.resolver.ts
@@ -12,7 +12,7 @@ import { User } from 'src/engine/core-modules/user/user.entity';
import { AnalyticsService } from './analytics.service';
import { Analytics } from './analytics.entity';
-import { CreateAnalyticsInput } from './dto/create-analytics.input';
+import { CreateAnalyticsInput } from './dtos/create-analytics.input';
@UseGuards(OptionalJwtAuthGuard)
@Resolver(() => Analytics)
diff --git a/packages/twenty-server/src/engine/core-modules/analytics/analytics.service.ts b/packages/twenty-server/src/engine/core-modules/analytics/analytics.service.ts
index 0f3de7b9fd71..07c0ed97b717 100644
--- a/packages/twenty-server/src/engine/core-modules/analytics/analytics.service.ts
+++ b/packages/twenty-server/src/engine/core-modules/analytics/analytics.service.ts
@@ -4,7 +4,10 @@ import { HttpService } from '@nestjs/axios';
import { anonymize } from 'src/utils/anonymize';
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service';
-import { CreateAnalyticsInput } from './dto/create-analytics.input';
+type CreateEventInput = {
+ type: string;
+ data: object;
+};
@Injectable()
export class AnalyticsService {
@@ -16,7 +19,7 @@ export class AnalyticsService {
) {}
async create(
- createEventInput: CreateAnalyticsInput,
+ createEventInput: CreateEventInput,
userId: string | undefined,
workspaceId: string | undefined,
workspaceDisplayName: string | undefined,
diff --git a/packages/twenty-server/src/engine/core-modules/analytics/dto/create-analytics.input.ts b/packages/twenty-server/src/engine/core-modules/analytics/dtos/create-analytics.input.ts
similarity index 100%
rename from packages/twenty-server/src/engine/core-modules/analytics/dto/create-analytics.input.ts
rename to packages/twenty-server/src/engine/core-modules/analytics/dtos/create-analytics.input.ts
diff --git a/packages/twenty-server/src/modules/connected-account/repositories/connected-account.repository.ts b/packages/twenty-server/src/modules/connected-account/repositories/connected-account.repository.ts
index 0f94f2861b0d..533da3fdbd6f 100644
--- a/packages/twenty-server/src/modules/connected-account/repositories/connected-account.repository.ts
+++ b/packages/twenty-server/src/modules/connected-account/repositories/connected-account.repository.ts
@@ -270,14 +270,6 @@ export class ConnectedAccountRepository {
);
}
- const refreshToken = connectedAccount.refreshToken;
-
- if (!refreshToken) {
- throw new Error(
- `No refresh token found for connected account ${connectedAccountId} in workspace ${workspaceId}`,
- );
- }
-
return connectedAccount;
}
}
diff --git a/packages/twenty-server/src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.module.ts b/packages/twenty-server/src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.module.ts
index da04044d3f34..1e090d1f7364 100644
--- a/packages/twenty-server/src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.module.ts
+++ b/packages/twenty-server/src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.module.ts
@@ -3,12 +3,14 @@ import { Module } from '@nestjs/common';
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
+import { GmailErrorHandlingModule } from 'src/modules/messaging/services/gmail-error-handling/gmail-error-handling.module';
@Module({
imports: [
ObjectMetadataRepositoryModule.forFeature([
ConnectedAccountWorkspaceEntity,
]),
+ GmailErrorHandlingModule,
],
providers: [GoogleAPIRefreshAccessTokenService],
exports: [GoogleAPIRefreshAccessTokenService],
diff --git a/packages/twenty-server/src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service.ts b/packages/twenty-server/src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service.ts
index 3297f0c6b7e4..02a935471573 100644
--- a/packages/twenty-server/src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service.ts
+++ b/packages/twenty-server/src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service.ts
@@ -6,6 +6,9 @@ import { EnvironmentService } from 'src/engine/integrations/environment/environm
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
+import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
+import { GmailErrorHandlingService } from 'src/modules/messaging/services/gmail-error-handling/gmail-error-handling.service';
+import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
@Injectable()
export class GoogleAPIRefreshAccessTokenService {
@@ -13,6 +16,9 @@ export class GoogleAPIRefreshAccessTokenService {
private readonly environmentService: EnvironmentService,
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
+ @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
+ private readonly messageChannelRepository: MessageChannelRepository,
+ private readonly gmailErrorHandlingService: GmailErrorHandlingService,
) {}
async refreshAndSaveAccessToken(
@@ -30,12 +36,6 @@ export class GoogleAPIRefreshAccessTokenService {
);
}
- if (connectedAccount.authFailedAt) {
- throw new Error(
- `Skipping refresh of access token for connected account ${connectedAccountId} in workspace ${workspaceId} because auth already failed, a new refresh token is needed`,
- );
- }
-
const refreshToken = connectedAccount.refreshToken;
if (!refreshToken) {
@@ -44,50 +44,55 @@ export class GoogleAPIRefreshAccessTokenService {
);
}
- const accessToken = await this.refreshAccessToken(
- refreshToken,
- connectedAccountId,
- workspaceId,
- );
-
- await this.connectedAccountRepository.updateAccessToken(
- accessToken,
- connectedAccountId,
- workspaceId,
- );
- }
-
- async refreshAccessToken(
- refreshToken: string,
- connectedAccountId: string,
- workspaceId: string,
- ): Promise {
try {
- const response = await axios.post(
- 'https://oauth2.googleapis.com/token',
- {
- client_id: this.environmentService.get('AUTH_GOOGLE_CLIENT_ID'),
- client_secret: this.environmentService.get(
- 'AUTH_GOOGLE_CLIENT_SECRET',
- ),
- refresh_token: refreshToken,
- grant_type: 'refresh_token',
- },
- {
- headers: {
- 'Content-Type': 'application/json',
- },
- },
- );
+ const accessToken = await this.refreshAccessToken(refreshToken);
- return response.data.access_token;
- } catch (error) {
- await this.connectedAccountRepository.updateAuthFailedAt(
+ await this.connectedAccountRepository.updateAccessToken(
+ accessToken,
connectedAccountId,
workspaceId,
);
+ } catch (error) {
+ const messageChannel =
+ await this.messageChannelRepository.getFirstByConnectedAccountId(
+ connectedAccountId,
+ workspaceId,
+ );
- throw new Error(`Error refreshing access token: ${error.message}`);
+ if (!messageChannel) {
+ throw new Error(
+ `No message channel found for connected account ${connectedAccountId} in workspace ${workspaceId}`,
+ );
+ }
+
+ await this.gmailErrorHandlingService.handleGmailError(
+ {
+ code: error.code,
+ reason: error.response.data.error,
+ },
+ 'messages-import',
+ messageChannel,
+ workspaceId,
+ );
}
}
+
+ async refreshAccessToken(refreshToken: string): Promise {
+ const response = await axios.post(
+ 'https://oauth2.googleapis.com/token',
+ {
+ client_id: this.environmentService.get('AUTH_GOOGLE_CLIENT_ID'),
+ client_secret: this.environmentService.get('AUTH_GOOGLE_CLIENT_SECRET'),
+ refresh_token: refreshToken,
+ grant_type: 'refresh_token',
+ },
+ {
+ headers: {
+ 'Content-Type': 'application/json',
+ },
+ },
+ );
+
+ return response.data.access_token;
+ }
}
diff --git a/packages/twenty-server/src/modules/messaging/crons/commands/gmail-message-list-fetch.cron.command.ts b/packages/twenty-server/src/modules/messaging/crons/commands/gmail-message-list-fetch.cron.command.ts
index da419abd5a67..4091eedee2a5 100644
--- a/packages/twenty-server/src/modules/messaging/crons/commands/gmail-message-list-fetch.cron.command.ts
+++ b/packages/twenty-server/src/modules/messaging/crons/commands/gmail-message-list-fetch.cron.command.ts
@@ -6,7 +6,7 @@ import { MessageQueue } from 'src/engine/integrations/message-queue/message-queu
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { GmailMessageListFetchCronJob } from 'src/modules/messaging/crons/jobs/gmail-message-list-fetch.cron.job';
-const GMAIL_PARTIAL_SYNC_CRON_PATTERN = '*/5 * * * *';
+const GMAIL_MESSAGE_LIST_FETCH_CRON_PATTERN = '*/5 * * * *';
@Command({
name: 'cron:messaging:gmail-message-list-fetch',
@@ -26,7 +26,7 @@ export class GmailMessageListFetchCronCommand extends CommandRunner {
GmailMessageListFetchCronJob.name,
undefined,
{
- repeat: { pattern: GMAIL_PARTIAL_SYNC_CRON_PATTERN },
+ repeat: { pattern: GMAIL_MESSAGE_LIST_FETCH_CRON_PATTERN },
},
);
}
diff --git a/packages/twenty-server/src/modules/messaging/crons/jobs/gmail-message-list-fetch.cron.job.ts b/packages/twenty-server/src/modules/messaging/crons/jobs/gmail-message-list-fetch.cron.job.ts
index 6d1ba52f1feb..c8266987d262 100644
--- a/packages/twenty-server/src/modules/messaging/crons/jobs/gmail-message-list-fetch.cron.job.ts
+++ b/packages/twenty-server/src/modules/messaging/crons/jobs/gmail-message-list-fetch.cron.job.ts
@@ -99,9 +99,6 @@ export class GmailMessageListFetchCronJob
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
},
- {
- retryLimit: 2,
- },
);
} else {
await this.messageQueueService.add(
@@ -110,9 +107,6 @@ export class GmailMessageListFetchCronJob
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
},
- {
- retryLimit: 2,
- },
);
}
}
diff --git a/packages/twenty-server/src/modules/messaging/crons/jobs/gmail-messages-import.cron.job.ts b/packages/twenty-server/src/modules/messaging/crons/jobs/gmail-messages-import.cron.job.ts
index 9d1ea699d2d0..c34be9edf404 100644
--- a/packages/twenty-server/src/modules/messaging/crons/jobs/gmail-messages-import.cron.job.ts
+++ b/packages/twenty-server/src/modules/messaging/crons/jobs/gmail-messages-import.cron.job.ts
@@ -19,6 +19,7 @@ import {
} from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
+import { MessagingTelemetryService } from 'src/modules/messaging/services/telemetry/messaging-telemetry.service';
@Injectable()
export class GmailMessagesImportCronJob implements MessageQueueJob {
@@ -38,6 +39,7 @@ export class GmailMessagesImportCronJob implements MessageQueueJob {
private readonly environmentService: EnvironmentService,
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
+ private readonly messagingTelemetryService: MessagingTelemetryService,
) {}
async handle(): Promise {
@@ -86,21 +88,24 @@ export class GmailMessagesImportCronJob implements MessageQueueJob {
}
if (isGmailSyncV2Enabled) {
- try {
- const connectedAccount =
- await this.connectedAccountRepository.getConnectedAccountOrThrow(
- workspaceId,
- messageChannel.connectedAccountId,
- );
-
- await this.gmailFetchMessageContentFromCacheV2Service.processMessageBatchImport(
- messageChannel,
- connectedAccount,
+ await this.messagingTelemetryService.track({
+ eventName: 'messages_import.triggered',
+ workspaceId,
+ connectedAccountId: messageChannel.connectedAccountId,
+ messageChannelId: messageChannel.id,
+ });
+
+ const connectedAccount =
+ await this.connectedAccountRepository.getConnectedAccountOrThrow(
workspaceId,
+ messageChannel.connectedAccountId,
);
- } catch (error) {
- this.logger.log(error.message);
- }
+
+ await this.gmailFetchMessageContentFromCacheV2Service.processMessageBatchImport(
+ messageChannel,
+ connectedAccount,
+ workspaceId,
+ );
} else {
await this.gmailFetchMessageContentFromCacheService.fetchMessageContentFromCache(
workspaceId,
diff --git a/packages/twenty-server/src/modules/messaging/crons/jobs/messaging-cron-job.module.ts b/packages/twenty-server/src/modules/messaging/crons/jobs/messaging-cron-job.module.ts
index 7ef4fedf8cfd..b54d9cb34430 100644
--- a/packages/twenty-server/src/modules/messaging/crons/jobs/messaging-cron-job.module.ts
+++ b/packages/twenty-server/src/modules/messaging/crons/jobs/messaging-cron-job.module.ts
@@ -9,6 +9,7 @@ import { GmailMessagesImportCronJob } from 'src/modules/messaging/crons/jobs/gma
import { GmailMessageListFetchCronJob } from 'src/modules/messaging/crons/jobs/gmail-message-list-fetch.cron.job';
import { GmailMessagesImportModule } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import.module';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
+import { MessagingTelemetryModule } from 'src/modules/messaging/services/telemetry/messaging-telemetry.module';
@Module({
imports: [
@@ -16,6 +17,7 @@ import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-ob
TypeOrmModule.forFeature([DataSourceEntity], 'metadata'),
ObjectMetadataRepositoryModule.forFeature([MessageChannelWorkspaceEntity]),
GmailMessagesImportModule,
+ MessagingTelemetryModule,
],
providers: [
{
diff --git a/packages/twenty-server/src/modules/messaging/jobs/blocklist-reimport-messages.job.ts b/packages/twenty-server/src/modules/messaging/jobs/blocklist-reimport-messages.job.ts
index 44ffeb4d099c..485a0a06b7f8 100644
--- a/packages/twenty-server/src/modules/messaging/jobs/blocklist-reimport-messages.job.ts
+++ b/packages/twenty-server/src/modules/messaging/jobs/blocklist-reimport-messages.job.ts
@@ -22,7 +22,7 @@ export class BlocklistReimportMessagesJob
constructor(
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
- private readonly gmailFullSyncService: GmailFullMessageListFetchService,
+ private readonly gmailFullMessageListFetchService: GmailFullMessageListFetchService,
) {}
async handle(data: BlocklistReimportMessagesJobData): Promise {
@@ -46,7 +46,7 @@ export class BlocklistReimportMessagesJob
return;
}
- await this.gmailFullSyncService.fetchConnectedAccountThreads(
+ await this.gmailFullMessageListFetchService.fetchConnectedAccountThreads(
workspaceId,
connectedAccount[0].id,
[handle],
diff --git a/packages/twenty-server/src/modules/messaging/jobs/gmail-full-message-list-fetch.job.ts b/packages/twenty-server/src/modules/messaging/jobs/gmail-full-message-list-fetch.job.ts
index 14d364fd0c92..d4a82435bdc8 100644
--- a/packages/twenty-server/src/modules/messaging/jobs/gmail-full-message-list-fetch.job.ts
+++ b/packages/twenty-server/src/modules/messaging/jobs/gmail-full-message-list-fetch.job.ts
@@ -1,9 +1,23 @@
import { Injectable, Logger } from '@nestjs/common';
+import { InjectRepository } from '@nestjs/typeorm';
+
+import { Repository } from 'typeorm';
import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
+import {
+ FeatureFlagEntity,
+ FeatureFlagKeys,
+} from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service';
import { GmailFullMessageListFetchService } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.service';
+import { GmailFullMessageListFetchV2Service } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch-v2.service';
+import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
+import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
+import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
+import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
+import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
+import { MessagingTelemetryService } from 'src/modules/messaging/services/telemetry/messaging-telemetry.service';
export type GmailFullMessageListFetchJobData = {
workspaceId: string;
@@ -18,31 +32,95 @@ export class GmailFullMessageListFetchJob
constructor(
private readonly googleAPIsRefreshAccessTokenService: GoogleAPIRefreshAccessTokenService,
- private readonly gmailFullSyncService: GmailFullMessageListFetchService,
+ private readonly gmailFullMessageListFetchService: GmailFullMessageListFetchService,
+ @InjectRepository(FeatureFlagEntity, 'core')
+ private readonly featureFlagRepository: Repository,
+ private readonly gmailFullMessageListFetchV2Service: GmailFullMessageListFetchV2Service,
+ @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
+ private readonly connectedAccountRepository: ConnectedAccountRepository,
+ @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
+ private readonly messageChannelRepository: MessageChannelRepository,
+ private readonly messagingTelemetryService: MessagingTelemetryService,
) {}
async handle(data: GmailFullMessageListFetchJobData): Promise {
+ const { workspaceId, connectedAccountId } = data;
+
this.logger.log(
- `gmail full-sync for workspace ${data.workspaceId} and account ${data.connectedAccountId}`,
+ `gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId}`,
);
try {
await this.googleAPIsRefreshAccessTokenService.refreshAndSaveAccessToken(
- data.workspaceId,
- data.connectedAccountId,
+ workspaceId,
+ connectedAccountId,
);
} catch (e) {
this.logger.error(
- `Error refreshing access token for connected account ${data.connectedAccountId} in workspace ${data.workspaceId}`,
+ `Error refreshing access token for connected account ${connectedAccountId} in workspace ${workspaceId}`,
e,
);
return;
}
- await this.gmailFullSyncService.fetchConnectedAccountThreads(
- data.workspaceId,
- data.connectedAccountId,
- );
+ const isGmailSyncV2EnabledFeatureFlag =
+ await this.featureFlagRepository.findOneBy({
+ workspaceId: workspaceId,
+ key: FeatureFlagKeys.IsGmailSyncV2Enabled,
+ value: true,
+ });
+
+ const isGmailSyncV2Enabled = isGmailSyncV2EnabledFeatureFlag?.value;
+
+ if (isGmailSyncV2Enabled) {
+ // Todo delete this code block after migration
+ const connectedAccount = await this.connectedAccountRepository.getById(
+ connectedAccountId,
+ workspaceId,
+ );
+
+ if (!connectedAccount) {
+ throw new Error(
+ `Connected account ${connectedAccountId} not found in workspace ${workspaceId}`,
+ );
+ }
+
+ const messageChannel =
+ await this.messageChannelRepository.getFirstByConnectedAccountId(
+ connectedAccountId,
+ workspaceId,
+ );
+
+ if (!messageChannel) {
+ throw new Error(
+ `No message channel found for connected account ${connectedAccountId} in workspace ${workspaceId}`,
+ );
+ }
+
+ await this.messagingTelemetryService.track({
+ eventName: 'full_message_list_fetch.started',
+ workspaceId,
+ connectedAccountId,
+ });
+
+ await this.gmailFullMessageListFetchV2Service.processMessageListFetch(
+ messageChannel,
+ connectedAccount,
+ workspaceId,
+ );
+
+ await this.messagingTelemetryService.track({
+ eventName: 'full_message_list_fetch.completed',
+ workspaceId,
+ connectedAccountId,
+ messageChannelId: messageChannel.id,
+ });
+ } else {
+ await this.gmailFullMessageListFetchService.fetchConnectedAccountThreads(
+ data.workspaceId,
+ data.connectedAccountId,
+ );
+ }
}
}
diff --git a/packages/twenty-server/src/modules/messaging/jobs/gmail-message-list-fetch.job.ts b/packages/twenty-server/src/modules/messaging/jobs/gmail-message-list-fetch.job.ts
index ca638890e33a..36c86faedfc2 100644
--- a/packages/twenty-server/src/modules/messaging/jobs/gmail-message-list-fetch.job.ts
+++ b/packages/twenty-server/src/modules/messaging/jobs/gmail-message-list-fetch.job.ts
@@ -2,11 +2,17 @@ import { Injectable, Logger } from '@nestjs/common';
import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
-import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service';
import { GmailPartialMessageListFetchV2Service } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-v2.service';
-import { GetConnectedAccountAndMessageChannelService } from 'src/modules/messaging/services/get-connected-account-and-message-channel/get-connected-account-and-message-channel.service';
-import { MessageChannelSyncSubStatus } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
-import { GmailFullMessageListFetchService } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.service';
+import {
+ MessageChannelSyncSubStatus,
+ MessageChannelWorkspaceEntity,
+} from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
+import { GmailFullMessageListFetchV2Service } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch-v2.service';
+import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
+import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
+import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
+import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
+import { MessagingTelemetryService } from 'src/modules/messaging/services/telemetry/messaging-telemetry.service';
export type GmailMessageListFetchJobData = {
workspaceId: string;
@@ -20,78 +26,110 @@ export class GmailMessageListFetchJob
private readonly logger = new Logger(GmailMessageListFetchJob.name);
constructor(
- private readonly googleAPIsRefreshAccessTokenService: GoogleAPIRefreshAccessTokenService,
- private readonly gmailFullSyncService: GmailFullMessageListFetchService,
+ private readonly gmailFullMessageListFetchV2Service: GmailFullMessageListFetchV2Service,
private readonly gmailPartialMessageListFetchV2Service: GmailPartialMessageListFetchV2Service,
- private readonly getConnectedAccountAndMessageChannelService: GetConnectedAccountAndMessageChannelService,
+ @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
+ private readonly connectedAccountRepository: ConnectedAccountRepository,
+ @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
+ private readonly messageChannelRepository: MessageChannelRepository,
+ private readonly messagingTelemetryService: MessagingTelemetryService,
) {}
async handle(data: GmailMessageListFetchJobData): Promise {
const { workspaceId, connectedAccountId } = data;
- this.logger.log(
- `Fetch gmail message list for workspace ${workspaceId} and account ${connectedAccountId}`,
+ await this.messagingTelemetryService.track({
+ eventName: 'message_list_fetch_job.triggered',
+ workspaceId,
+ connectedAccountId,
+ });
+
+ const connectedAccount = await this.connectedAccountRepository.getById(
+ connectedAccountId,
+ workspaceId,
);
- try {
- await this.googleAPIsRefreshAccessTokenService.refreshAndSaveAccessToken(
+ if (!connectedAccount) {
+ await this.messagingTelemetryService.track({
+ eventName: 'message_list_fetch_job.error.connected_account_not_found',
workspaceId,
connectedAccountId,
- );
- } catch (e) {
- this.logger.error(
- `Error refreshing access token for connected account ${connectedAccountId} in workspace ${workspaceId}`,
- e,
- );
+ });
return;
}
- const { messageChannel, connectedAccount } =
- await this.getConnectedAccountAndMessageChannelService.getConnectedAccountAndMessageChannelOrThrow(
- workspaceId,
+ const messageChannel =
+ await this.messageChannelRepository.getFirstByConnectedAccountId(
connectedAccountId,
+ workspaceId,
);
+ if (!messageChannel) {
+ await this.messagingTelemetryService.track({
+ eventName: 'message_list_fetch_job.error.message_channel_not_found',
+ workspaceId,
+ connectedAccountId,
+ });
+
+ return;
+ }
+
switch (messageChannel.syncSubStatus) {
- case MessageChannelSyncSubStatus.PARTIAL_MESSAGES_LIST_FETCH_PENDING:
- try {
- await this.gmailPartialMessageListFetchV2Service.processMessageListFetch(
- messageChannel,
- connectedAccount,
- workspaceId,
- );
- } catch (e) {
- this.logger.error(e);
- }
-
- return;
-
- case MessageChannelSyncSubStatus.FULL_MESSAGES_LIST_FETCH_PENDING:
- try {
- await this.gmailFullSyncService.fetchConnectedAccountThreads(
- workspaceId,
- connectedAccountId,
- );
- } catch (e) {
- this.logger.error(e);
- }
-
- return;
-
- case MessageChannelSyncSubStatus.FAILED:
- this.logger.error(
- `Messaging import for workspace ${workspaceId} and account ${connectedAccountId} is in a failed state.`,
+ case MessageChannelSyncSubStatus.PARTIAL_MESSAGE_LIST_FETCH_PENDING:
+ this.logger.log(
+ `Fetching partial message list for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
- return;
+ await this.messagingTelemetryService.track({
+ eventName: 'partial_message_list_fetch.started',
+ workspaceId,
+ connectedAccountId,
+ });
- default:
- this.logger.error(
- `Messaging import for workspace ${workspaceId} and account ${connectedAccountId} is locked, import will be retried later.`,
+ await this.gmailPartialMessageListFetchV2Service.processMessageListFetch(
+ messageChannel,
+ connectedAccount,
+ workspaceId,
+ );
+
+ await this.messagingTelemetryService.track({
+ eventName: 'partial_message_list_fetch.completed',
+ workspaceId,
+ connectedAccountId,
+ messageChannelId: messageChannel.id,
+ });
+
+ break;
+
+ case MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING:
+ this.logger.log(
+ `Fetching full message list for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
- return;
+ await this.messagingTelemetryService.track({
+ eventName: 'full_message_list_fetch.started',
+ workspaceId,
+ connectedAccountId,
+ });
+
+ await this.gmailFullMessageListFetchV2Service.processMessageListFetch(
+ messageChannel,
+ connectedAccount,
+ workspaceId,
+ );
+
+ await this.messagingTelemetryService.track({
+ eventName: 'full_message_list_fetch.completed',
+ workspaceId,
+ connectedAccountId,
+ messageChannelId: messageChannel.id,
+ });
+
+ break;
+
+ default:
+ break;
}
}
}
diff --git a/packages/twenty-server/src/modules/messaging/jobs/messaging-job.module.ts b/packages/twenty-server/src/modules/messaging/jobs/messaging-job.module.ts
index 820ebf476de7..63b78ffe3010 100644
--- a/packages/twenty-server/src/modules/messaging/jobs/messaging-job.module.ts
+++ b/packages/twenty-server/src/modules/messaging/jobs/messaging-job.module.ts
@@ -14,9 +14,10 @@ import { GmailFullMessageListFetchJob } from 'src/modules/messaging/jobs/gmail-f
import { GmailMessageListFetchJob } from 'src/modules/messaging/jobs/gmail-message-list-fetch.job';
import { GmailPartialMessageListFetchJob } from 'src/modules/messaging/jobs/gmail-partial-message-list-fetch.job';
import { MessagingCreateCompanyAndContactAfterSyncJob } from 'src/modules/messaging/jobs/messaging-create-company-and-contact-after-sync.job';
-import { GetConnectedAccountAndMessageChannelModule } from 'src/modules/messaging/services/get-connected-account-and-message-channel/get-connected-account-and-message-channel.module';
import { GmailFullMessageListFetchModule } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.module';
import { GmailPartialMessageListFetchModule } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.module';
+import { SetMessageChannelSyncStatusModule } from 'src/modules/messaging/services/message-channel-sync-status/message-channel-sync-status.module';
+import { MessagingTelemetryModule } from 'src/modules/messaging/services/telemetry/messaging-telemetry.module';
import { ThreadCleanerModule } from 'src/modules/messaging/services/thread-cleaner/thread-cleaner.module';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel-message-association.workspace-entity';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
@@ -31,13 +32,14 @@ import { MessageParticipantWorkspaceEntity } from 'src/modules/messaging/standar
MessageChannelMessageAssociationWorkspaceEntity,
BlocklistWorkspaceEntity,
]),
+ MessagingTelemetryModule,
GmailFullMessageListFetchModule,
GmailPartialMessageListFetchModule,
ThreadCleanerModule,
GoogleAPIRefreshAccessTokenModule,
AutoCompaniesAndContactsCreationModule,
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
- GetConnectedAccountAndMessageChannelModule,
+ SetMessageChannelSyncStatusModule,
],
providers: [
{
diff --git a/packages/twenty-server/src/modules/messaging/services/get-connected-account-and-message-channel/get-connected-account-and-message-channel.module.ts b/packages/twenty-server/src/modules/messaging/services/get-connected-account-and-message-channel/get-connected-account-and-message-channel.module.ts
deleted file mode 100644
index 137fc7a4b183..000000000000
--- a/packages/twenty-server/src/modules/messaging/services/get-connected-account-and-message-channel/get-connected-account-and-message-channel.module.ts
+++ /dev/null
@@ -1,18 +0,0 @@
-import { Module } from '@nestjs/common';
-
-import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
-import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
-import { GetConnectedAccountAndMessageChannelService } from 'src/modules/messaging/services/get-connected-account-and-message-channel/get-connected-account-and-message-channel.service';
-import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
-
-@Module({
- imports: [
- ObjectMetadataRepositoryModule.forFeature([
- ConnectedAccountWorkspaceEntity,
- MessageChannelWorkspaceEntity,
- ]),
- ],
- providers: [GetConnectedAccountAndMessageChannelService],
- exports: [GetConnectedAccountAndMessageChannelService],
-})
-export class GetConnectedAccountAndMessageChannelModule {}
diff --git a/packages/twenty-server/src/modules/messaging/services/get-connected-account-and-message-channel/get-connected-account-and-message-channel.service.ts b/packages/twenty-server/src/modules/messaging/services/get-connected-account-and-message-channel/get-connected-account-and-message-channel.service.ts
deleted file mode 100644
index 441696ea1356..000000000000
--- a/packages/twenty-server/src/modules/messaging/services/get-connected-account-and-message-channel/get-connected-account-and-message-channel.service.ts
+++ /dev/null
@@ -1,62 +0,0 @@
-import { Injectable } from '@nestjs/common';
-
-import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
-import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
-import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
-import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
-import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
-import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
-
-@Injectable()
-export class GetConnectedAccountAndMessageChannelService {
- constructor(
- @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
- private readonly connectedAccountRepository: ConnectedAccountRepository,
- @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
- private readonly messageChannelRepository: MessageChannelRepository,
- ) {}
-
- public async getConnectedAccountAndMessageChannelOrThrow(
- workspaceId: string,
- connectedAccountId: string,
- ): Promise<{
- messageChannel: ObjectRecord;
- connectedAccount: ObjectRecord;
- }> {
- const connectedAccount = await this.connectedAccountRepository.getById(
- connectedAccountId,
- workspaceId,
- );
-
- if (!connectedAccount) {
- throw new Error(
- `Connected account ${connectedAccountId} not found in workspace ${workspaceId}`,
- );
- }
-
- const refreshToken = connectedAccount.refreshToken;
-
- if (!refreshToken) {
- throw new Error(
- `No refresh token found for connected account ${connectedAccountId} in workspace ${workspaceId}`,
- );
- }
-
- const messageChannel =
- await this.messageChannelRepository.getFirstByConnectedAccountId(
- connectedAccountId,
- workspaceId,
- );
-
- if (!messageChannel) {
- throw new Error(
- `No message channel found for connected account ${connectedAccountId} in workspace ${workspaceId}`,
- );
- }
-
- return {
- messageChannel,
- connectedAccount,
- };
- }
-}
diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-error-handling/gmail-error-handling.module.ts b/packages/twenty-server/src/modules/messaging/services/gmail-error-handling/gmail-error-handling.module.ts
new file mode 100644
index 000000000000..915fac67fdc8
--- /dev/null
+++ b/packages/twenty-server/src/modules/messaging/services/gmail-error-handling/gmail-error-handling.module.ts
@@ -0,0 +1,12 @@
+import { Module } from '@nestjs/common';
+
+import { GmailErrorHandlingService } from 'src/modules/messaging/services/gmail-error-handling/gmail-error-handling.service';
+import { SetMessageChannelSyncStatusModule } from 'src/modules/messaging/services/message-channel-sync-status/message-channel-sync-status.module';
+import { MessagingTelemetryModule } from 'src/modules/messaging/services/telemetry/messaging-telemetry.module';
+
+@Module({
+ imports: [SetMessageChannelSyncStatusModule, MessagingTelemetryModule],
+ providers: [GmailErrorHandlingService],
+ exports: [GmailErrorHandlingService],
+})
+export class GmailErrorHandlingModule {}
diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-error-handling/gmail-error-handling.service.ts b/packages/twenty-server/src/modules/messaging/services/gmail-error-handling/gmail-error-handling.service.ts
new file mode 100644
index 000000000000..b0f96a8cd12c
--- /dev/null
+++ b/packages/twenty-server/src/modules/messaging/services/gmail-error-handling/gmail-error-handling.service.ts
@@ -0,0 +1,192 @@
+import { Injectable } from '@nestjs/common';
+
+import snakeCase from 'lodash.snakecase';
+
+import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
+import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
+import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
+import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
+import { MessageChannelSyncStatusService } from 'src/modules/messaging/services/message-channel-sync-status/message-channel-sync-status.service';
+import { MessagingTelemetryService } from 'src/modules/messaging/services/telemetry/messaging-telemetry.service';
+import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
+
+type SyncStep =
+ | 'partial-message-list-fetch'
+ | 'full-message-list-fetch'
+ | 'messages-import';
+
+export type GmailError = {
+ code: number;
+ reason: string;
+};
+
+@Injectable()
+export class GmailErrorHandlingService {
+ constructor(
+ @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
+ private readonly connectedAccountRepository: ConnectedAccountRepository,
+ private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService,
+ private readonly messagingTelemetryService: MessagingTelemetryService,
+ ) {}
+
+ public async handleGmailError(
+ error: GmailError,
+ syncStep: SyncStep,
+ messageChannel: ObjectRecord,
+ workspaceId: string,
+ ): Promise {
+ const { code, reason } = error;
+
+ switch (code) {
+ case 400:
+ if (reason === 'invalid_grant') {
+ await this.handleInsufficientPermissions(
+ error,
+ syncStep,
+ messageChannel,
+ workspaceId,
+ );
+ }
+ break;
+ case 404:
+ await this.handleNotFound(error, syncStep, messageChannel, workspaceId);
+ break;
+
+ case 429:
+ await this.handleRateLimitExceeded(
+ error,
+ syncStep,
+ messageChannel,
+ workspaceId,
+ );
+ break;
+
+ case 403:
+ if (
+ reason === 'rateLimitExceeded' ||
+ reason === 'userRateLimitExceeded'
+ ) {
+ await this.handleRateLimitExceeded(
+ error,
+ syncStep,
+ messageChannel,
+ workspaceId,
+ );
+ } else {
+ await this.handleInsufficientPermissions(
+ error,
+ syncStep,
+ messageChannel,
+ workspaceId,
+ );
+ }
+ break;
+
+ case 401:
+ await this.handleInsufficientPermissions(
+ error,
+ syncStep,
+ messageChannel,
+ workspaceId,
+ );
+ break;
+
+ default:
+ await this.messageChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport(
+ messageChannel.id,
+ workspaceId,
+ );
+ break;
+ }
+ }
+
+ public async handleRateLimitExceeded(
+ error: GmailError,
+ syncStep: SyncStep,
+ messageChannel: ObjectRecord,
+ workspaceId: string,
+ ): Promise {
+ await this.messagingTelemetryService.track({
+ eventName: `${snakeCase(syncStep)}.error.rate_limit_exceeded`,
+ workspaceId,
+ connectedAccountId: messageChannel.connectedAccountId,
+ messageChannelId: messageChannel.id,
+ message: `${error.code}: ${error.reason}`,
+ });
+
+ switch (syncStep) {
+ case 'full-message-list-fetch':
+ await this.messageChannelSyncStatusService.scheduleFullMessageListFetch(
+ messageChannel.id,
+ workspaceId,
+ );
+ break;
+
+ case 'partial-message-list-fetch':
+ await this.messageChannelSyncStatusService.schedulePartialMessageListFetch(
+ messageChannel.id,
+ workspaceId,
+ );
+ break;
+
+ case 'messages-import':
+ await this.messageChannelSyncStatusService.scheduleMessagesImport(
+ messageChannel.id,
+ workspaceId,
+ );
+ break;
+
+ default:
+ break;
+ }
+ }
+
+ public async handleInsufficientPermissions(
+ error: GmailError,
+ syncStep: SyncStep,
+ messageChannel: ObjectRecord,
+ workspaceId: string,
+ ): Promise {
+ await this.messagingTelemetryService.track({
+ eventName: `${snakeCase(syncStep)}.error.insufficient_permissions`,
+ workspaceId,
+ connectedAccountId: messageChannel.connectedAccountId,
+ messageChannelId: messageChannel.id,
+ message: `${error.code}: ${error.reason}`,
+ });
+
+ await this.messageChannelSyncStatusService.markAsFailedInsufficientPermissionsAndFlushMessagesToImport(
+ messageChannel.id,
+ workspaceId,
+ );
+
+ await this.connectedAccountRepository.updateAuthFailedAt(
+ messageChannel.connectedAccountId,
+ workspaceId,
+ );
+ }
+
+ public async handleNotFound(
+ error: GmailError,
+ syncStep: SyncStep,
+ messageChannel: ObjectRecord,
+ workspaceId: string,
+ ): Promise {
+ if (syncStep === 'messages-import') {
+ return;
+ }
+
+ await this.messagingTelemetryService.track({
+ eventName: `${snakeCase(syncStep)}.error.not_found`,
+ workspaceId,
+ connectedAccountId: messageChannel.connectedAccountId,
+ messageChannelId: messageChannel.id,
+ message: `404: ${error.reason}`,
+ });
+
+ await this.messageChannelSyncStatusService.resetAndScheduleFullMessageListFetch(
+ messageChannel.id,
+ workspaceId,
+ );
+ }
+}
diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch-v2.service.ts b/packages/twenty-server/src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch-v2.service.ts
new file mode 100644
index 000000000000..87e85b0a0032
--- /dev/null
+++ b/packages/twenty-server/src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch-v2.service.ts
@@ -0,0 +1,208 @@
+import { Injectable, Logger } from '@nestjs/common';
+
+import { EntityManager } from 'typeorm';
+import { gmail_v1 } from 'googleapis';
+import { GaxiosResponse } from 'gaxios';
+
+import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service';
+import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator';
+import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
+import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
+import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
+import { GMAIL_USERS_MESSAGES_LIST_MAX_RESULT } from 'src/modules/messaging/constants/gmail-users-messages-list-max-result.constant';
+import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/repositories/message-channel-message-association.repository';
+import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
+import { GmailClientProvider } from 'src/modules/messaging/services/providers/gmail/gmail-client.provider';
+import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel-message-association.workspace-entity';
+import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
+import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
+import {
+ GmailError,
+ GmailErrorHandlingService,
+} from 'src/modules/messaging/services/gmail-error-handling/gmail-error-handling.service';
+import { MessageChannelSyncStatusService } from 'src/modules/messaging/services/message-channel-sync-status/message-channel-sync-status.service';
+
+@Injectable()
+export class GmailFullMessageListFetchV2Service {
+ private readonly logger = new Logger(GmailFullMessageListFetchV2Service.name);
+
+ constructor(
+ private readonly gmailClientProvider: GmailClientProvider,
+ @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
+ private readonly messageChannelRepository: MessageChannelRepository,
+ @InjectCacheStorage(CacheStorageNamespace.Messaging)
+ private readonly cacheStorage: CacheStorageService,
+ @InjectObjectMetadataRepository(
+ MessageChannelMessageAssociationWorkspaceEntity,
+ )
+ private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository,
+ private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService,
+ private readonly gmailErrorHandlingService: GmailErrorHandlingService,
+ ) {}
+
+ public async processMessageListFetch(
+ messageChannel: ObjectRecord,
+ connectedAccount: ObjectRecord,
+ workspaceId: string,
+ ) {
+ await this.messageChannelSyncStatusService.markAsMessagesListFetchOngoing(
+ messageChannel.id,
+ workspaceId,
+ );
+
+ const gmailClient: gmail_v1.Gmail =
+ await this.gmailClientProvider.getGmailClient(
+ connectedAccount.refreshToken,
+ );
+
+ const { error: gmailError } =
+ await this.fetchAllMessageIdsFromGmailAndStoreInCache(
+ gmailClient,
+ messageChannel.id,
+ workspaceId,
+ );
+
+ if (gmailError) {
+ await this.gmailErrorHandlingService.handleGmailError(
+ gmailError,
+ 'full-message-list-fetch',
+ messageChannel,
+ workspaceId,
+ );
+
+ return;
+ }
+
+ await this.messageChannelSyncStatusService.scheduleMessagesImport(
+ messageChannel.id,
+ workspaceId,
+ );
+ }
+
+ private async fetchAllMessageIdsFromGmailAndStoreInCache(
+ gmailClient: gmail_v1.Gmail,
+ messageChannelId: string,
+ workspaceId: string,
+ transactionManager?: EntityManager,
+ ): Promise<{ error?: GmailError }> {
+ let pageToken: string | undefined;
+ let fetchedMessageIdsCount = 0;
+ let hasMoreMessages = true;
+ let firstMessageExternalId: string | undefined;
+ let response: GaxiosResponse;
+
+ while (hasMoreMessages) {
+ try {
+ response = await gmailClient.users.messages.list({
+ userId: 'me',
+ maxResults: GMAIL_USERS_MESSAGES_LIST_MAX_RESULT,
+ pageToken,
+ });
+ } catch (error) {
+ return {
+ error: {
+ code: error.response?.status,
+ reason: error.response?.data?.error,
+ },
+ };
+ }
+
+ if (response.data?.messages) {
+ const messageExternalIds = response.data.messages
+ .filter((message): message is { id: string } => message.id != null)
+ .map((message) => message.id);
+
+ if (!firstMessageExternalId) {
+ firstMessageExternalId = messageExternalIds[0];
+ }
+
+ const existingMessageChannelMessageAssociations =
+ await this.messageChannelMessageAssociationRepository.getByMessageExternalIdsAndMessageChannelId(
+ messageExternalIds,
+ messageChannelId,
+ workspaceId,
+ transactionManager,
+ );
+
+ const existingMessageChannelMessageAssociationsExternalIds =
+ existingMessageChannelMessageAssociations.map(
+ (messageChannelMessageAssociation) =>
+ messageChannelMessageAssociation.messageExternalId,
+ );
+
+ const messageIdsToImport = messageExternalIds.filter(
+ (messageExternalId) =>
+ !existingMessageChannelMessageAssociationsExternalIds.includes(
+ messageExternalId,
+ ),
+ );
+
+ if (messageIdsToImport.length) {
+ await this.cacheStorage.setAdd(
+ `messages-to-import:${workspaceId}:gmail:${messageChannelId}`,
+ messageIdsToImport,
+ );
+ }
+
+ fetchedMessageIdsCount += messageExternalIds.length;
+ }
+
+ pageToken = response.data.nextPageToken ?? undefined;
+ hasMoreMessages = !!pageToken;
+ }
+
+ this.logger.log(
+ `Added ${fetchedMessageIdsCount} messages ids from Gmail for messageChannel ${messageChannelId} in workspace ${workspaceId} and added to cache for import`,
+ );
+
+ if (!firstMessageExternalId) {
+ throw new Error(
+ `No first message found for workspace ${workspaceId} and account ${messageChannelId}, can't update sync external id`,
+ );
+ }
+
+ await this.updateLastSyncCursor(
+ gmailClient,
+ messageChannelId,
+ firstMessageExternalId,
+ workspaceId,
+ transactionManager,
+ );
+
+ return {};
+ }
+
+ private async updateLastSyncCursor(
+ gmailClient: gmail_v1.Gmail,
+ messageChannelId: string,
+ firstMessageExternalId: string,
+ workspaceId: string,
+ transactionManager?: EntityManager,
+ ) {
+ const firstMessageContent = await gmailClient.users.messages.get({
+ userId: 'me',
+ id: firstMessageExternalId,
+ });
+
+ if (!firstMessageContent?.data) {
+ throw new Error(
+ `No first message content found for message ${firstMessageExternalId} in workspace ${workspaceId}`,
+ );
+ }
+
+ const historyId = firstMessageContent?.data?.historyId;
+
+ if (!historyId) {
+ throw new Error(
+ `No historyId found for message ${firstMessageExternalId} in workspace ${workspaceId}`,
+ );
+ }
+
+ await this.messageChannelRepository.updateLastSyncCursorIfHigher(
+ messageChannelId,
+ historyId,
+ workspaceId,
+ transactionManager,
+ );
+ }
+}
diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.module.ts b/packages/twenty-server/src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.module.ts
index b782de73aa9f..e5733ffc44d7 100644
--- a/packages/twenty-server/src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.module.ts
+++ b/packages/twenty-server/src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.module.ts
@@ -7,7 +7,10 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works
import { BlocklistWorkspaceEntity } from 'src/modules/connected-account/standard-objects/blocklist.workspace-entity';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { FetchMessagesByBatchesModule } from 'src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.module';
+import { GmailErrorHandlingModule } from 'src/modules/messaging/services/gmail-error-handling/gmail-error-handling.module';
+import { GmailFullMessageListFetchV2Service } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch-v2.service';
import { GmailFullMessageListFetchService } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.service';
+import { SetMessageChannelSyncStatusModule } from 'src/modules/messaging/services/message-channel-sync-status/message-channel-sync-status.module';
import { MessagingProvidersModule } from 'src/modules/messaging/services/providers/messaging-providers.module';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel-message-association.workspace-entity';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
@@ -16,6 +19,7 @@ import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-ob
imports: [
MessagingProvidersModule,
FetchMessagesByBatchesModule,
+ GmailErrorHandlingModule,
ObjectMetadataRepositoryModule.forFeature([
ConnectedAccountWorkspaceEntity,
MessageChannelWorkspaceEntity,
@@ -24,8 +28,15 @@ import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-ob
]),
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
WorkspaceDataSourceModule,
+ SetMessageChannelSyncStatusModule,
+ ],
+ providers: [
+ GmailFullMessageListFetchService,
+ GmailFullMessageListFetchV2Service,
+ ],
+ exports: [
+ GmailFullMessageListFetchService,
+ GmailFullMessageListFetchV2Service,
],
- providers: [GmailFullMessageListFetchService],
- exports: [GmailFullMessageListFetchService],
})
export class GmailFullMessageListFetchModule {}
diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-messages-import/gmail-messages-import-v2.service.ts b/packages/twenty-server/src/modules/messaging/services/gmail-messages-import/gmail-messages-import-v2.service.ts
index 5b86f30933e7..92d535e12345 100644
--- a/packages/twenty-server/src/modules/messaging/services/gmail-messages-import/gmail-messages-import-v2.service.ts
+++ b/packages/twenty-server/src/modules/messaging/services/gmail-messages-import/gmail-messages-import-v2.service.ts
@@ -11,11 +11,13 @@ import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decora
import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service';
import { GMAIL_USERS_MESSAGES_GET_BATCH_SIZE } from 'src/modules/messaging/constants/gmail-users-messages-get-batch-size.constant';
-import { GMAIL_ONGOING_SYNC_TIMEOUT } from 'src/modules/messaging/constants/gmail-ongoing-sync-timeout.constant';
-import { GmailMessagesImportService } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import.service';
-import { SetMessageChannelSyncStatusService } from 'src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.service';
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
import { SaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/services/gmail-messages-import/save-messages-and-enqueue-contact-creation.service';
+import { GmailErrorHandlingService } from 'src/modules/messaging/services/gmail-error-handling/gmail-error-handling.service';
+import { MessageChannelSyncStatusService } from 'src/modules/messaging/services/message-channel-sync-status/message-channel-sync-status.service';
+import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service';
+import { GmailMessagesImportService } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import.service';
+import { MessagingTelemetryService } from 'src/modules/messaging/services/telemetry/messaging-telemetry.service';
@Injectable()
export class GmailMessagesImportV2Service {
@@ -25,8 +27,11 @@ export class GmailMessagesImportV2Service {
private readonly fetchMessagesByBatchesService: FetchMessagesByBatchesService,
@InjectCacheStorage(CacheStorageNamespace.Messaging)
private readonly cacheStorage: CacheStorageService,
- private readonly setMessageChannelSyncStatusService: SetMessageChannelSyncStatusService,
+ private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService,
private readonly saveMessagesAndEnqueueContactCreationService: SaveMessagesAndEnqueueContactCreationService,
+ private readonly gmailErrorHandlingService: GmailErrorHandlingService,
+ private readonly googleAPIsRefreshAccessTokenService: GoogleAPIRefreshAccessTokenService,
+ private readonly messagingTelemetryService: MessagingTelemetryService,
) {}
async processMessageBatchImport(
@@ -34,30 +39,34 @@ export class GmailMessagesImportV2Service {
connectedAccount: ObjectRecord,
workspaceId: string,
) {
- if (messageChannel.syncSubStatus === MessageChannelSyncSubStatus.FAILED) {
- throw new Error(
- `Connected account ${connectedAccount.id} in workspace ${workspaceId} is in a failed state. Skipping...`,
- );
- }
-
if (
messageChannel.syncSubStatus !==
MessageChannelSyncSubStatus.MESSAGES_IMPORT_PENDING
) {
- throw new Error(
- `Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} is not pending.`,
- );
+ return;
}
- await this.setMessageChannelSyncStatusService.setMessagesImportOnGoingStatus(
- messageChannel.id,
+ await this.messagingTelemetryService.track({
+ eventName: 'messages_import.started',
workspaceId,
- );
+ connectedAccountId: messageChannel.connectedAccountId,
+ messageChannelId: messageChannel.id,
+ });
this.logger.log(
`Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} starting...`,
);
+ await this.messageChannelSyncStatusService.markAsMessagesImportOngoing(
+ messageChannel.id,
+ workspaceId,
+ );
+
+ await this.googleAPIsRefreshAccessTokenService.refreshAndSaveAccessToken(
+ workspaceId,
+ connectedAccount.id,
+ );
+
const messageIdsToFetch =
(await this.cacheStorage.setPop(
`messages-to-import:${workspaceId}:gmail:${messageChannel.id}`,
@@ -65,16 +74,15 @@ export class GmailMessagesImportV2Service {
)) ?? [];
if (!messageIdsToFetch?.length) {
- await this.setMessageChannelSyncStatusService.setCompletedStatus(
+ await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
messageChannel.id,
workspaceId,
);
- this.logger.log(
- `Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} done with nothing to import or delete.`,
+ return await this.trackMessageImportCompleted(
+ messageChannel,
+ workspaceId,
);
-
- return;
}
const messageQueries = createQueriesFromMessageIds(messageIdsToFetch);
@@ -89,12 +97,15 @@ export class GmailMessagesImportV2Service {
);
if (!messagesToSave.length) {
- await this.setMessageChannelSyncStatusService.setCompletedStatus(
+ await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
messageChannel.id,
workspaceId,
);
- return [];
+ return await this.trackMessageImportCompleted(
+ messageChannel,
+ workspaceId,
+ );
}
await this.saveMessagesAndEnqueueContactCreationService.saveMessagesAndEnqueueContactCreationJob(
@@ -105,42 +116,53 @@ export class GmailMessagesImportV2Service {
);
if (messageIdsToFetch.length < GMAIL_USERS_MESSAGES_GET_BATCH_SIZE) {
- await this.setMessageChannelSyncStatusService.setCompletedStatus(
+ await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
messageChannel.id,
workspaceId,
);
-
- this.logger.log(
- `Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} done with no more messages to import.`,
- );
} else {
- await this.setMessageChannelSyncStatusService.setMessagesImportPendingStatus(
+ await this.messageChannelSyncStatusService.scheduleMessagesImport(
messageChannel.id,
workspaceId,
);
-
- this.logger.log(
- `Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} done with more messages to import.`,
- );
}
+
+ return await this.trackMessageImportCompleted(
+ messageChannel,
+ workspaceId,
+ );
} catch (error) {
await this.cacheStorage.setAdd(
`messages-to-import:${workspaceId}:gmail:${messageChannel.id}`,
messageIdsToFetch,
);
- await this.setMessageChannelSyncStatusService.setFailedUnkownStatus(
- messageChannel.id,
+ await this.gmailErrorHandlingService.handleGmailError(
+ {
+ code: error.code,
+ reason: error.errors?.[0]?.reason,
+ },
+ 'messages-import',
+ messageChannel,
workspaceId,
);
- this.logger.error(
- `Error fetching messages for ${connectedAccount.id} in workspace ${workspaceId}: locking for ${GMAIL_ONGOING_SYNC_TIMEOUT}ms...`,
- );
-
- throw new Error(
- `Error fetching messages for ${connectedAccount.id} in workspace ${workspaceId}: ${error.message}`,
+ return await this.trackMessageImportCompleted(
+ messageChannel,
+ workspaceId,
);
}
}
+
+ private async trackMessageImportCompleted(
+ messageChannel: ObjectRecord,
+ workspaceId: string,
+ ) {
+ await this.messagingTelemetryService.track({
+ eventName: 'messages_import.completed',
+ workspaceId,
+ connectedAccountId: messageChannel.connectedAccountId,
+ messageChannelId: messageChannel.id,
+ });
+ }
}
diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-messages-import/gmail-messages-import.module.ts b/packages/twenty-server/src/modules/messaging/services/gmail-messages-import/gmail-messages-import.module.ts
index 12b73f1e70fe..6aef6fcd2933 100644
--- a/packages/twenty-server/src/modules/messaging/services/gmail-messages-import/gmail-messages-import.module.ts
+++ b/packages/twenty-server/src/modules/messaging/services/gmail-messages-import/gmail-messages-import.module.ts
@@ -4,14 +4,17 @@ import { TypeOrmModule } from '@nestjs/typeorm';
import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
+import { GoogleAPIRefreshAccessTokenModule } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.module';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { FetchMessagesByBatchesModule } from 'src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.module';
+import { GmailErrorHandlingModule } from 'src/modules/messaging/services/gmail-error-handling/gmail-error-handling.module';
import { GmailMessagesImportV2Service } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import-v2.service';
import { GmailMessagesImportService } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import.service';
import { SaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/services/gmail-messages-import/save-messages-and-enqueue-contact-creation.service';
+import { SetMessageChannelSyncStatusModule } from 'src/modules/messaging/services/message-channel-sync-status/message-channel-sync-status.module';
import { MessageParticipantModule } from 'src/modules/messaging/services/message-participant/message-participant.module';
import { MessageModule } from 'src/modules/messaging/services/message/message.module';
-import { SetMessageChannelSyncStatusModule } from 'src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.module';
+import { MessagingTelemetryModule } from 'src/modules/messaging/services/telemetry/messaging-telemetry.module';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
@Module({
@@ -27,6 +30,9 @@ import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-ob
MessageParticipantModule,
SetMessageChannelSyncStatusModule,
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
+ GmailErrorHandlingModule,
+ GoogleAPIRefreshAccessTokenModule,
+ MessagingTelemetryModule,
],
providers: [
GmailMessagesImportService,
diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-get-history.service.ts b/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-get-history.service.ts
index e2953b0b8b0c..84373a240955 100644
--- a/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-get-history.service.ts
+++ b/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-get-history.service.ts
@@ -1,9 +1,10 @@
import { Injectable } from '@nestjs/common';
import { gmail_v1 } from 'googleapis';
+import { GaxiosResponse } from 'gaxios';
import { GMAIL_USERS_HISTORY_MAX_RESULT } from 'src/modules/messaging/constants/gmail-users-history-max-result.constant';
-import { GmailError } from 'src/modules/messaging/types/gmail-error';
+import { GmailError } from 'src/modules/messaging/services/gmail-error-handling/gmail-error-handling.service';
@Injectable()
export class GmailGetHistoryService {
@@ -21,38 +22,36 @@ export class GmailGetHistoryService {
let pageToken: string | undefined;
let hasMoreMessages = true;
let nextHistoryId: string | undefined;
+ let response: GaxiosResponse;
while (hasMoreMessages) {
try {
- const response = await gmailClient.users.history.list({
+ response = await gmailClient.users.history.list({
userId: 'me',
maxResults: GMAIL_USERS_HISTORY_MAX_RESULT,
pageToken,
startHistoryId: lastSyncHistoryId,
historyTypes: ['messageAdded', 'messageDeleted'],
});
-
- nextHistoryId = response?.data?.historyId ?? undefined;
-
- if (response?.data?.history) {
- fullHistory.push(...response.data.history);
- }
-
- pageToken = response?.data?.nextPageToken ?? undefined;
- hasMoreMessages = !!pageToken;
} catch (error) {
- const errorData = error?.response?.data?.error;
+ return {
+ history: [],
+ error: {
+ code: error.response?.status,
+ reason: error.response?.data?.error,
+ },
+ historyId: lastSyncHistoryId,
+ };
+ }
- if (errorData) {
- return {
- history: [],
- error: errorData,
- historyId: lastSyncHistoryId,
- };
- }
+ nextHistoryId = response?.data?.historyId ?? undefined;
- throw error;
+ if (response?.data?.history) {
+ fullHistory.push(...response.data.history);
}
+
+ pageToken = response?.data?.nextPageToken ?? undefined;
+ hasMoreMessages = !!pageToken;
}
return { history: fullHistory, historyId: nextHistoryId };
diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-error-handling.service.ts b/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-error-handling.service.ts
deleted file mode 100644
index 638ffffdb3ec..000000000000
--- a/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-error-handling.service.ts
+++ /dev/null
@@ -1,122 +0,0 @@
-import { Injectable, Logger } from '@nestjs/common';
-
-import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
-import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
-import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
-import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
-import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
-import {
- MessageChannelSyncStatus,
- MessageChannelSyncSubStatus,
- MessageChannelWorkspaceEntity,
-} from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
-import { GmailError } from 'src/modules/messaging/types/gmail-error';
-
-@Injectable()
-export class GmailPartialMessageListFetchErrorHandlingService {
- private readonly logger = new Logger(
- GmailPartialMessageListFetchErrorHandlingService.name,
- );
-
- constructor(
- @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
- private readonly connectedAccountRepository: ConnectedAccountRepository,
- @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
- private readonly messageChannelRepository: MessageChannelRepository,
- ) {}
-
- public async handleGmailError(
- error: GmailError | undefined,
- messageChannel: ObjectRecord,
- connectedAccountId: string,
- workspaceId: string,
- ): Promise {
- switch (error?.code) {
- case 404:
- this.logger.log(
- `404: Invalid lastSyncHistoryId for workspace ${workspaceId} and account ${connectedAccountId}, falling back to full sync.`,
- );
- await this.messageChannelRepository.resetSyncCursor(
- messageChannel.id,
- workspaceId,
- );
- await this.messageChannelRepository.updateSyncSubStatus(
- messageChannel.id,
- MessageChannelSyncSubStatus.FULL_MESSAGES_LIST_FETCH_PENDING,
- workspaceId,
- );
- break;
-
- case 429:
- this.logger.log(
- `429: rate limit reached for workspace ${workspaceId} and account ${connectedAccountId}: ${error.message}, import will be retried later.`,
- );
- await this.handleRateLimitExceeded(messageChannel, workspaceId);
- break;
-
- case 403:
- if (
- error?.errors?.[0]?.reason === 'rateLimitExceeded' ||
- error?.errors?.[0]?.reason === 'userRateLimitExceeded'
- ) {
- this.logger.log(
- `403:${
- error?.errors?.[0]?.reason === 'userRateLimitExceeded' && ' user'
- } rate limit exceeded for workspace ${workspaceId} and account ${connectedAccountId}: ${
- error.message
- }, import will be retried later.`,
- );
- this.handleRateLimitExceeded(messageChannel, workspaceId);
- } else {
- await this.handleInsufficientPermissions(
- error,
- messageChannel,
- workspaceId,
- );
- }
- break;
-
- case 401:
- this.handleInsufficientPermissions(error, messageChannel, workspaceId);
- break;
-
- default:
- break;
- }
- }
-
- public async handleRateLimitExceeded(
- messageChannel: MessageChannelWorkspaceEntity,
- workspaceId: string,
- ): Promise {
- await this.messageChannelRepository.updateSyncSubStatus(
- messageChannel.id,
- MessageChannelSyncSubStatus.PARTIAL_MESSAGES_LIST_FETCH_PENDING,
- workspaceId,
- );
- }
-
- public async handleInsufficientPermissions(
- error: GmailError,
- messageChannel: MessageChannelWorkspaceEntity,
- workspaceId: string,
- ): Promise {
- this.logger.error(
- `{error?.code}: ${error.message} for workspace ${workspaceId} and account ${messageChannel.connectedAccount.id}`,
- );
- await this.messageChannelRepository.updateSyncStatus(
- messageChannel.id,
- MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS,
- workspaceId,
- );
- await this.messageChannelRepository.updateSyncSubStatus(
- messageChannel.id,
- MessageChannelSyncSubStatus.PARTIAL_MESSAGES_LIST_FETCH_PENDING,
- workspaceId,
- );
- await this.connectedAccountRepository.updateAuthFailedAt(
- messageChannel.connectedAccount.id,
- workspaceId,
- );
- }
-}
diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-v2.service.ts b/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-v2.service.ts
index b3e85fb2db85..31d5db0f4a19 100644
--- a/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-v2.service.ts
+++ b/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-v2.service.ts
@@ -12,10 +12,10 @@ import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decora
import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel-message-association.workspace-entity';
import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/repositories/message-channel-message-association.repository';
-import { GmailPartialMessageListFetchErrorHandlingService } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-error-handling.service';
import { GmailGetHistoryService } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-get-history.service';
-import { SetMessageChannelSyncStatusService } from 'src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.service';
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
+import { GmailErrorHandlingService } from 'src/modules/messaging/services/gmail-error-handling/gmail-error-handling.service';
+import { MessageChannelSyncStatusService } from 'src/modules/messaging/services/message-channel-sync-status/message-channel-sync-status.service';
@Injectable()
export class GmailPartialMessageListFetchV2Service {
@@ -33,9 +33,9 @@ export class GmailPartialMessageListFetchV2Service {
MessageChannelMessageAssociationWorkspaceEntity,
)
private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository,
- private readonly gmailPartialMessageListFetchErrorHandlingService: GmailPartialMessageListFetchErrorHandlingService,
+ private readonly gmailErrorHandlingService: GmailErrorHandlingService,
private readonly gmailGetHistoryService: GmailGetHistoryService,
- private readonly setMessageChannelSyncStatusService: SetMessageChannelSyncStatusService,
+ private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService,
) {}
public async processMessageListFetch(
@@ -43,30 +43,13 @@ export class GmailPartialMessageListFetchV2Service {
connectedAccount: ObjectRecord,
workspaceId: string,
): Promise {
- this.logger.log(
- `Fetching partial message list for workspace ${workspaceId} and account ${connectedAccount.id}`,
- );
-
- await this.setMessageChannelSyncStatusService.setMessageListFetchOnGoingStatus(
+ await this.messageChannelSyncStatusService.markAsMessagesListFetchOngoing(
messageChannel.id,
workspaceId,
);
const lastSyncHistoryId = messageChannel.syncCursor;
- if (!lastSyncHistoryId) {
- this.logger.log(
- `No lastSyncHistoryId for workspace ${workspaceId} and account ${connectedAccount.id}, falling back to full sync.`,
- );
-
- await this.setMessageChannelSyncStatusService.setFullMessageListFetchPendingStatus(
- messageChannel.id,
- workspaceId,
- );
-
- return;
- }
-
const gmailClient: gmail_v1.Gmail =
await this.gmailClientProvider.getGmailClient(
connectedAccount.refreshToken,
@@ -79,11 +62,11 @@ export class GmailPartialMessageListFetchV2Service {
);
if (error) {
- await this.gmailPartialMessageListFetchErrorHandlingService.handleGmailError(
+ await this.gmailErrorHandlingService.handleGmailError(
error,
+ 'partial-message-list-fetch',
messageChannel,
workspaceId,
- connectedAccount.id,
);
return;
@@ -100,7 +83,7 @@ export class GmailPartialMessageListFetchV2Service {
`Partial message list import done with history ${historyId} and nothing to update for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
- await this.setMessageChannelSyncStatusService.setCompletedStatus(
+ await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
messageChannel.id,
workspaceId,
);
@@ -136,15 +119,7 @@ export class GmailPartialMessageListFetchV2Service {
workspaceId,
);
- this.logger.log(
- `Updated lastSyncCursor to ${historyId} for workspace ${workspaceId} and account ${connectedAccount.id}`,
- );
-
- this.logger.log(
- `Partial message list import done with history ${historyId} for workspace ${workspaceId} and account ${connectedAccount.id}`,
- );
-
- await this.setMessageChannelSyncStatusService.setMessagesImportPendingStatus(
+ await this.messageChannelSyncStatusService.scheduleMessagesImport(
messageChannel.id,
workspaceId,
);
diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.module.ts b/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.module.ts
index ce288a5b78f5..99168cadcdfd 100644
--- a/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.module.ts
+++ b/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.module.ts
@@ -7,13 +7,13 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works
import { BlocklistWorkspaceEntity } from 'src/modules/connected-account/standard-objects/blocklist.workspace-entity';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { FetchMessagesByBatchesModule } from 'src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.module';
+import { GmailErrorHandlingModule } from 'src/modules/messaging/services/gmail-error-handling/gmail-error-handling.module';
import { GmailGetHistoryService } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-get-history.service';
-import { GmailPartialMessageListFetchErrorHandlingService } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-error-handling.service';
import { GmailPartialMessageListFetchV2Service } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-v2.service';
import { GmailPartialMessageListFetchService } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.service';
+import { SetMessageChannelSyncStatusModule } from 'src/modules/messaging/services/message-channel-sync-status/message-channel-sync-status.module';
import { MessageModule } from 'src/modules/messaging/services/message/message.module';
import { MessagingProvidersModule } from 'src/modules/messaging/services/providers/messaging-providers.module';
-import { SetMessageChannelSyncStatusModule } from 'src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.module';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
@Module({
@@ -29,11 +29,11 @@ import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-ob
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
WorkspaceDataSourceModule,
SetMessageChannelSyncStatusModule,
+ GmailErrorHandlingModule,
],
providers: [
GmailPartialMessageListFetchService,
GmailPartialMessageListFetchV2Service,
- GmailPartialMessageListFetchErrorHandlingService,
GmailGetHistoryService,
],
exports: [
diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.service.ts b/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.service.ts
index 7ffac02d7de7..779c6d6f3670 100644
--- a/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.service.ts
+++ b/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.service.ts
@@ -15,7 +15,6 @@ import {
MessageChannelSyncStatus,
} from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
import { GMAIL_USERS_HISTORY_MAX_RESULT } from 'src/modules/messaging/constants/gmail-users-history-max-result.constant';
-import { GmailError } from 'src/modules/messaging/types/gmail-error';
import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service';
import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
@@ -293,7 +292,7 @@ export class GmailPartialMessageListFetchService {
): Promise<{
history: gmail_v1.Schema$History[];
historyId?: string | null;
- error?: GmailError;
+ error?: any;
}> {
const fullHistory: gmail_v1.Schema$History[] = [];
let pageToken: string | undefined;
diff --git a/packages/twenty-server/src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.module.ts b/packages/twenty-server/src/modules/messaging/services/message-channel-sync-status/message-channel-sync-status.module.ts
similarity index 63%
rename from packages/twenty-server/src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.module.ts
rename to packages/twenty-server/src/modules/messaging/services/message-channel-sync-status/message-channel-sync-status.module.ts
index 6d20cbac4f90..42e350d074c9 100644
--- a/packages/twenty-server/src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.module.ts
+++ b/packages/twenty-server/src/modules/messaging/services/message-channel-sync-status/message-channel-sync-status.module.ts
@@ -1,14 +1,14 @@
import { Module } from '@nestjs/common';
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
-import { SetMessageChannelSyncStatusService } from 'src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.service';
+import { MessageChannelSyncStatusService } from 'src/modules/messaging/services/message-channel-sync-status/message-channel-sync-status.service';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
@Module({
imports: [
ObjectMetadataRepositoryModule.forFeature([MessageChannelWorkspaceEntity]),
],
- providers: [SetMessageChannelSyncStatusService],
- exports: [SetMessageChannelSyncStatusService],
+ providers: [MessageChannelSyncStatusService],
+ exports: [MessageChannelSyncStatusService],
})
export class SetMessageChannelSyncStatusModule {}
diff --git a/packages/twenty-server/src/modules/messaging/services/message-channel-sync-status/message-channel-sync-status.service.ts b/packages/twenty-server/src/modules/messaging/services/message-channel-sync-status/message-channel-sync-status.service.ts
new file mode 100644
index 000000000000..522d71daec88
--- /dev/null
+++ b/packages/twenty-server/src/modules/messaging/services/message-channel-sync-status/message-channel-sync-status.service.ts
@@ -0,0 +1,156 @@
+import { Injectable } from '@nestjs/common';
+
+import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service';
+import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator';
+import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
+import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
+import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
+import {
+ MessageChannelWorkspaceEntity,
+ MessageChannelSyncSubStatus,
+ MessageChannelSyncStatus,
+} from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
+
+@Injectable()
+export class MessageChannelSyncStatusService {
+ constructor(
+ @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
+ private readonly messageChannelRepository: MessageChannelRepository,
+ @InjectCacheStorage(CacheStorageNamespace.Messaging)
+ private readonly cacheStorage: CacheStorageService,
+ ) {}
+
+ public async scheduleFullMessageListFetch(
+ messageChannelId: string,
+ workspaceId: string,
+ ) {
+ await this.messageChannelRepository.updateSyncSubStatus(
+ messageChannelId,
+ MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING,
+ workspaceId,
+ );
+ }
+
+ public async schedulePartialMessageListFetch(
+ messageChannelId: string,
+ workspaceId: string,
+ ) {
+ await this.messageChannelRepository.updateSyncSubStatus(
+ messageChannelId,
+ MessageChannelSyncSubStatus.PARTIAL_MESSAGE_LIST_FETCH_PENDING,
+ workspaceId,
+ );
+ }
+
+ public async scheduleMessagesImport(
+ messageChannelId: string,
+ workspaceId: string,
+ ) {
+ await this.messageChannelRepository.updateSyncSubStatus(
+ messageChannelId,
+ MessageChannelSyncSubStatus.MESSAGES_IMPORT_PENDING,
+ workspaceId,
+ );
+ }
+
+ public async resetAndScheduleFullMessageListFetch(
+ messageChannelId: string,
+ workspaceId: string,
+ ) {
+ await this.cacheStorage.setPop(
+ `messages-to-import:${workspaceId}:gmail:${messageChannelId}`,
+ );
+
+ // TODO: remove nextPageToken from cache
+
+ await this.messageChannelRepository.resetSyncCursor(
+ messageChannelId,
+ workspaceId,
+ );
+
+ await this.scheduleFullMessageListFetch(messageChannelId, workspaceId);
+ }
+
+ public async markAsMessagesListFetchOngoing(
+ messageChannelId: string,
+ workspaceId: string,
+ ) {
+ await this.messageChannelRepository.updateSyncSubStatus(
+ messageChannelId,
+ MessageChannelSyncSubStatus.MESSAGE_LIST_FETCH_ONGOING,
+ workspaceId,
+ );
+
+ await this.messageChannelRepository.updateSyncStatus(
+ messageChannelId,
+ MessageChannelSyncStatus.ONGOING,
+ workspaceId,
+ );
+ }
+
+ public async markAsCompletedAndSchedulePartialMessageListFetch(
+ messageChannelId: string,
+ workspaceId: string,
+ ) {
+ await this.messageChannelRepository.updateSyncStatus(
+ messageChannelId,
+ MessageChannelSyncStatus.COMPLETED,
+ workspaceId,
+ );
+
+ await this.schedulePartialMessageListFetch(messageChannelId, workspaceId);
+ }
+
+ public async markAsMessagesImportOngoing(
+ messageChannelId: string,
+ workspaceId: string,
+ ) {
+ await this.messageChannelRepository.updateSyncSubStatus(
+ messageChannelId,
+ MessageChannelSyncSubStatus.MESSAGES_IMPORT_ONGOING,
+ workspaceId,
+ );
+ }
+
+ public async markAsFailedUnknownAndFlushMessagesToImport(
+ messageChannelId: string,
+ workspaceId: string,
+ ) {
+ await this.cacheStorage.setPop(
+ `messages-to-import:${workspaceId}:gmail:${messageChannelId}`,
+ );
+
+ await this.messageChannelRepository.updateSyncSubStatus(
+ messageChannelId,
+ MessageChannelSyncSubStatus.FAILED,
+ workspaceId,
+ );
+
+ await this.messageChannelRepository.updateSyncStatus(
+ messageChannelId,
+ MessageChannelSyncStatus.FAILED_UNKNOWN,
+ workspaceId,
+ );
+ }
+
+ public async markAsFailedInsufficientPermissionsAndFlushMessagesToImport(
+ messageChannelId: string,
+ workspaceId: string,
+ ) {
+ await this.cacheStorage.setPop(
+ `messages-to-import:${workspaceId}:gmail:${messageChannelId}`,
+ );
+
+ await this.messageChannelRepository.updateSyncSubStatus(
+ messageChannelId,
+ MessageChannelSyncSubStatus.FAILED,
+ workspaceId,
+ );
+
+ await this.messageChannelRepository.updateSyncStatus(
+ messageChannelId,
+ MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS,
+ workspaceId,
+ );
+ }
+}
diff --git a/packages/twenty-server/src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.service.ts b/packages/twenty-server/src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.service.ts
deleted file mode 100644
index 4c24f43494e4..000000000000
--- a/packages/twenty-server/src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.service.ts
+++ /dev/null
@@ -1,101 +0,0 @@
-import { Injectable } from '@nestjs/common';
-
-import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
-import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
-import {
- MessageChannelWorkspaceEntity,
- MessageChannelSyncSubStatus,
- MessageChannelSyncStatus,
-} from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
-
-@Injectable()
-export class SetMessageChannelSyncStatusService {
- constructor(
- @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
- private readonly messageChannelRepository: MessageChannelRepository,
- ) {}
-
- public async setMessageListFetchOnGoingStatus(
- messageChannelId: string,
- workspaceId: string,
- ) {
- await this.messageChannelRepository.updateSyncSubStatus(
- messageChannelId,
- MessageChannelSyncSubStatus.MESSAGES_LIST_FETCH_ONGOING,
- workspaceId,
- );
-
- await this.messageChannelRepository.updateSyncStatus(
- messageChannelId,
- MessageChannelSyncStatus.ONGOING,
- workspaceId,
- );
- }
-
- public async setFullMessageListFetchPendingStatus(
- messageChannelId: string,
- workspaceId: string,
- ) {
- await this.messageChannelRepository.updateSyncSubStatus(
- messageChannelId,
- MessageChannelSyncSubStatus.FULL_MESSAGES_LIST_FETCH_PENDING,
- workspaceId,
- );
- }
-
- public async setCompletedStatus(
- messageChannelId: string,
- workspaceId: string,
- ) {
- await this.messageChannelRepository.updateSyncSubStatus(
- messageChannelId,
- MessageChannelSyncSubStatus.PARTIAL_MESSAGES_LIST_FETCH_PENDING,
- workspaceId,
- );
-
- await this.messageChannelRepository.updateSyncStatus(
- messageChannelId,
- MessageChannelSyncStatus.COMPLETED,
- workspaceId,
- );
- }
-
- public async setMessagesImportPendingStatus(
- messageChannelId: string,
- workspaceId: string,
- ) {
- await this.messageChannelRepository.updateSyncSubStatus(
- messageChannelId,
- MessageChannelSyncSubStatus.MESSAGES_IMPORT_PENDING,
- workspaceId,
- );
- }
-
- public async setMessagesImportOnGoingStatus(
- messageChannelId: string,
- workspaceId: string,
- ) {
- await this.messageChannelRepository.updateSyncSubStatus(
- messageChannelId,
- MessageChannelSyncSubStatus.MESSAGES_IMPORT_ONGOING,
- workspaceId,
- );
- }
-
- public async setFailedUnkownStatus(
- messageChannelId: string,
- workspaceId: string,
- ) {
- await this.messageChannelRepository.updateSyncSubStatus(
- messageChannelId,
- MessageChannelSyncSubStatus.FAILED,
- workspaceId,
- );
-
- await this.messageChannelRepository.updateSyncStatus(
- messageChannelId,
- MessageChannelSyncStatus.FAILED_UNKNOWN,
- workspaceId,
- );
- }
-}
diff --git a/packages/twenty-server/src/modules/messaging/services/telemetry/messaging-telemetry.module.ts b/packages/twenty-server/src/modules/messaging/services/telemetry/messaging-telemetry.module.ts
new file mode 100644
index 000000000000..f5bd69370a5e
--- /dev/null
+++ b/packages/twenty-server/src/modules/messaging/services/telemetry/messaging-telemetry.module.ts
@@ -0,0 +1,11 @@
+import { Module } from '@nestjs/common';
+
+import { AnalyticsModule } from 'src/engine/core-modules/analytics/analytics.module';
+import { MessagingTelemetryService } from 'src/modules/messaging/services/telemetry/messaging-telemetry.service';
+
+@Module({
+ imports: [AnalyticsModule],
+ providers: [MessagingTelemetryService],
+ exports: [MessagingTelemetryService],
+})
+export class MessagingTelemetryModule {}
diff --git a/packages/twenty-server/src/modules/messaging/services/telemetry/messaging-telemetry.service.ts b/packages/twenty-server/src/modules/messaging/services/telemetry/messaging-telemetry.service.ts
new file mode 100644
index 000000000000..4a70a7330216
--- /dev/null
+++ b/packages/twenty-server/src/modules/messaging/services/telemetry/messaging-telemetry.service.ts
@@ -0,0 +1,45 @@
+import { Injectable } from '@nestjs/common';
+
+import { AnalyticsService } from 'src/engine/core-modules/analytics/analytics.service';
+
+type MessagingTelemetryTrackInput = {
+ eventName: string;
+ workspaceId: string;
+ userId?: string;
+ connectedAccountId?: string;
+ messageChannelId?: string;
+ message?: string;
+};
+
+@Injectable()
+export class MessagingTelemetryService {
+ constructor(private readonly analyticsService: AnalyticsService) {}
+
+ public async track({
+ eventName,
+ workspaceId,
+ userId,
+ connectedAccountId,
+ messageChannelId,
+ message,
+ }: MessagingTelemetryTrackInput): Promise {
+ await this.analyticsService.create(
+ {
+ type: 'track',
+ data: {
+ eventName: `messaging.${eventName}`,
+ workspaceId,
+ userId,
+ connectedAccountId,
+ messageChannelId,
+ message,
+ },
+ },
+ userId,
+ workspaceId,
+ '', // voluntarely not retrieving this
+ '', // to avoid slowing down
+ '',
+ );
+ }
+}
diff --git a/packages/twenty-server/src/modules/messaging/standard-objects/message-channel.workspace-entity.ts b/packages/twenty-server/src/modules/messaging/standard-objects/message-channel.workspace-entity.ts
index e06b40f2f96d..f947498c7304 100644
--- a/packages/twenty-server/src/modules/messaging/standard-objects/message-channel.workspace-entity.ts
+++ b/packages/twenty-server/src/modules/messaging/standard-objects/message-channel.workspace-entity.ts
@@ -32,9 +32,9 @@ export enum MessageChannelSyncStatus {
}
export enum MessageChannelSyncSubStatus {
- FULL_MESSAGES_LIST_FETCH_PENDING = 'FULL_MESSAGES_LIST_FETCH_PENDING',
- PARTIAL_MESSAGES_LIST_FETCH_PENDING = 'PARTIAL_MESSAGES_LIST_FETCH_PENDING',
- MESSAGES_LIST_FETCH_ONGOING = 'MESSAGES_LIST_FETCH_ONGOING',
+ FULL_MESSAGE_LIST_FETCH_PENDING = 'FULL_MESSAGE_LIST_FETCH_PENDING',
+ PARTIAL_MESSAGE_LIST_FETCH_PENDING = 'PARTIAL_MESSAGE_LIST_FETCH_PENDING',
+ MESSAGE_LIST_FETCH_ONGOING = 'MESSAGE_LIST_FETCH_ONGOING',
MESSAGES_IMPORT_PENDING = 'MESSAGES_IMPORT_PENDING',
MESSAGES_IMPORT_ONGOING = 'MESSAGES_IMPORT_ONGOING',
FAILED = 'FAILED',
@@ -234,19 +234,19 @@ export class MessageChannelWorkspaceEntity extends BaseWorkspaceEntity {
icon: 'IconStatusChange',
options: [
{
- value: MessageChannelSyncSubStatus.FULL_MESSAGES_LIST_FETCH_PENDING,
+ value: MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING,
label: 'Full messages list fetch pending',
position: 0,
color: 'blue',
},
{
- value: MessageChannelSyncSubStatus.PARTIAL_MESSAGES_LIST_FETCH_PENDING,
+ value: MessageChannelSyncSubStatus.PARTIAL_MESSAGE_LIST_FETCH_PENDING,
label: 'Partial messages list fetch pending',
position: 1,
color: 'blue',
},
{
- value: MessageChannelSyncSubStatus.MESSAGES_LIST_FETCH_ONGOING,
+ value: MessageChannelSyncSubStatus.MESSAGE_LIST_FETCH_ONGOING,
label: 'Messages list fetch ongoing',
position: 2,
color: 'orange',
@@ -270,7 +270,7 @@ export class MessageChannelWorkspaceEntity extends BaseWorkspaceEntity {
color: 'red',
},
],
- defaultValue: `'${MessageChannelSyncSubStatus.FULL_MESSAGES_LIST_FETCH_PENDING}'`,
+ defaultValue: `'${MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING}'`,
})
syncSubStatus: MessageChannelSyncSubStatus;
diff --git a/packages/twenty-server/src/modules/messaging/types/gmail-error.ts b/packages/twenty-server/src/modules/messaging/types/gmail-error.ts
deleted file mode 100644
index ff715e217a66..000000000000
--- a/packages/twenty-server/src/modules/messaging/types/gmail-error.ts
+++ /dev/null
@@ -1,11 +0,0 @@
-export type GmailError = {
- code: number;
- errors: {
- domain: string;
- reason: string;
- message: string;
- locationType?: string;
- location?: string;
- }[];
- message: string;
-};