Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

5531 update gmail full sync to v2 #5674

Merged
merged 69 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
c5d5dda
create cron for full sync
bosiraphael May 22, 2024
a92ab88
create full sync service v2
bosiraphael May 22, 2024
e907641
module imports
bosiraphael May 22, 2024
a1ff376
remove unnecessary transaction
bosiraphael May 22, 2024
7ad290f
remove unnecessary transaction
bosiraphael May 22, 2024
faedd8b
Merge branch 'main' into 5531-create-a-new-cron-fullsyncmessagelistfe…
bosiraphael May 23, 2024
f89f083
remove cron and command
bosiraphael May 23, 2024
47c197a
Merge branch 'main' into 5531-update-gmail-full-sync-to-v2
bosiraphael May 27, 2024
a897dfb
renaming
bosiraphael May 27, 2024
c71c400
renaming
bosiraphael May 27, 2024
d31f452
refactor
bosiraphael May 27, 2024
3644149
update status
bosiraphael May 27, 2024
c23be07
refactoring
bosiraphael May 27, 2024
7d95fd7
remove unused imports
bosiraphael May 28, 2024
4ddc7d4
update log
bosiraphael May 28, 2024
ea97d71
move error handling in its own module
bosiraphael May 28, 2024
0a7b383
refactoring error handling
bosiraphael May 28, 2024
d52cc69
refactoring
bosiraphael May 28, 2024
7db2a78
refactor error handling in full sync
bosiraphael May 28, 2024
5ac324c
refactoring
bosiraphael May 28, 2024
57107c7
add syncType argument to handleGmailError
bosiraphael May 28, 2024
2d700bc
fix module import
bosiraphael May 28, 2024
97d3820
fix type
bosiraphael May 29, 2024
ee87be7
Merge branch 'main' into 5531-update-gmail-full-sync-to-v2
bosiraphael May 29, 2024
933167a
update full-sync job
bosiraphael May 29, 2024
2803ffd
fix
bosiraphael May 29, 2024
df1188e
Refactoring
bosiraphael May 29, 2024
6fd273d
Fix error
bosiraphael May 29, 2024
ee0c51f
Rename constant
bosiraphael May 29, 2024
3c12fa6
remove unnecessary service
bosiraphael May 29, 2024
f9f6841
Set MessagesImportPendingStatus at the end of full-sync
bosiraphael May 29, 2024
6531b5c
fix status
bosiraphael May 29, 2024
568d750
Add 429 placeholder
bosiraphael May 29, 2024
17a246c
update statuses
bosiraphael May 29, 2024
a539581
remove duplicated code
bosiraphael May 29, 2024
20a0fbe
fix
bosiraphael May 29, 2024
abf70fe
add error code
bosiraphael May 30, 2024
b1d2991
update log
bosiraphael May 30, 2024
afa11cd
update statuses
bosiraphael May 30, 2024
7df4052
use setMessageChannelSyncStatusService in error handling
bosiraphael May 30, 2024
12f928d
refactoring
bosiraphael May 30, 2024
abcec13
Add worker to docker-compose
charlesBochet May 30, 2024
8e509ae
refactor message-channel-sync-status.service
bosiraphael May 30, 2024
216c5fd
refactor
bosiraphael May 30, 2024
6a5ff3b
Merge branch '5531-update-gmail-full-sync-to-v2' of github.com:twenty…
bosiraphael May 30, 2024
991f340
refactor SyncType
bosiraphael May 30, 2024
b1a3111
refactor message-channel-sync-status.service
bosiraphael May 30, 2024
1499d66
handle 401
bosiraphael May 30, 2024
8809928
move refreshing token inside gmail-message-import
bosiraphael May 30, 2024
65041ec
fix module import
bosiraphael May 30, 2024
caec6d4
refactoring
bosiraphael May 30, 2024
ce204bd
refactor error handling
bosiraphael May 30, 2024
44c8716
Start sending logs to telemetry on messaging
charlesBochet May 30, 2024
d07bc35
Start sending logs to telemetry on messaging
charlesBochet May 30, 2024
da56166
Fix
charlesBochet May 30, 2024
2fb2100
remove logs
bosiraphael May 30, 2024
de742d2
remove log
bosiraphael May 30, 2024
eafa1c1
Add telemetry
charlesBochet May 30, 2024
f51370e
Fix
charlesBochet May 30, 2024
f4ae89d
refactor
bosiraphael May 30, 2024
01de588
fix error reason for gmail messages import
bosiraphael May 30, 2024
df342cf
return after handling error
bosiraphael May 30, 2024
27a68ac
refactoring gmail-messages-import-v2 error handling
bosiraphael May 30, 2024
02c6440
remove try catch around token refresh
bosiraphael May 30, 2024
75a8cc3
fix typo in statuses
bosiraphael May 30, 2024
103c963
Add telemetry logs
charlesBochet May 30, 2024
27f3e03
Fix
charlesBochet May 30, 2024
0721212
Fixes
charlesBochet May 31, 2024
bdca23a
Fix
charlesBochet May 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -270,14 +270,6 @@ export class ConnectedAccountRepository {
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the refresh token check might cause issues if other parts of the system assume a refresh token is always present.

}

const refreshToken = connectedAccount.refreshToken;

if (!refreshToken) {
throw new Error(
`No refresh token found for connected account ${connectedAccountId} in workspace ${workspaceId}`,
);
}

return connectedAccount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ import { Module } from '@nestjs/common';
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { SetMessageChannelSyncStatusModule } from 'src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.module';

@Module({
imports: [
ObjectMetadataRepositoryModule.forFeature([
ConnectedAccountWorkspaceEntity,
]),
SetMessageChannelSyncStatusModule,
],
providers: [GoogleAPIRefreshAccessTokenService],
exports: [GoogleAPIRefreshAccessTokenService],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,19 @@ import { EnvironmentService } from 'src/engine/integrations/environment/environm
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
import { SetMessageChannelSyncStatusService } from 'src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.service';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';

@Injectable()
export class GoogleAPIRefreshAccessTokenService {
constructor(
private readonly environmentService: EnvironmentService,
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
private readonly setMessageChannelSyncStatusService: SetMessageChannelSyncStatusService,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
) {}

async refreshAndSaveAccessToken(
Expand Down Expand Up @@ -87,7 +93,26 @@ export class GoogleAPIRefreshAccessTokenService {
workspaceId,
);

throw new Error(`Error refreshing access token: ${error.message}`);
const messageChannel =
await this.messageChannelRepository.getFirstByConnectedAccountId(
connectedAccountId,
workspaceId,
);

if (!messageChannel) {
throw new Error(
`No message channel found for connected account ${connectedAccountId} in workspace ${workspaceId}`,
);
}

await this.setMessageChannelSyncStatusService.setFailedInsufficientPermissionsStatus(
messageChannel.id,
workspaceId,
);

throw new Error(
`Error refreshing access token: ${error.code}: ${error.message}`,
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { MessageQueue } from 'src/engine/integrations/message-queue/message-queu
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { GmailMessageListFetchCronJob } from 'src/modules/messaging/crons/jobs/gmail-message-list-fetch.cron.job';

const GMAIL_PARTIAL_SYNC_CRON_PATTERN = '*/5 * * * *';
const GMAIL_MESSAGE_LIST_FETCH_CRON_PATTERN = '*/5 * * * *';

@Command({
name: 'cron:messaging:gmail-message-list-fetch',
Expand All @@ -26,7 +26,7 @@ export class GmailMessageListFetchCronCommand extends CommandRunner {
GmailMessageListFetchCronJob.name,
undefined,
{
repeat: { pattern: GMAIL_PARTIAL_SYNC_CRON_PATTERN },
repeat: { pattern: GMAIL_MESSAGE_LIST_FETCH_CRON_PATTERN },
},
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export class BlocklistReimportMessagesJob
constructor(
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
private readonly gmailFullSyncService: GmailFullMessageListFetchService,
private readonly gmailFullMessageListFetchService: GmailFullMessageListFetchService,
) {}

async handle(data: BlocklistReimportMessagesJobData): Promise<void> {
Expand All @@ -46,7 +46,7 @@ export class BlocklistReimportMessagesJob
return;
}

await this.gmailFullSyncService.fetchConnectedAccountThreads(
await this.gmailFullMessageListFetchService.fetchConnectedAccountThreads(
workspaceId,
connectedAccount[0].id,
[handle],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';

import { Repository } from 'typeorm';

import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';

import {
FeatureFlagEntity,
FeatureFlagKeys,
} from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service';
import { GmailFullMessageListFetchService } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.service';
import { GmailFullMessageListFetchV2Service } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch-v2.service';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
import { SetMessageChannelSyncStatusService } from 'src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.service';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';

export type GmailFullMessageListFetchJobData = {
workspaceId: string;
Expand All @@ -18,31 +32,101 @@ export class GmailFullMessageListFetchJob

constructor(
private readonly googleAPIsRefreshAccessTokenService: GoogleAPIRefreshAccessTokenService,
private readonly gmailFullSyncService: GmailFullMessageListFetchService,
private readonly gmailFullMessageListFetchService: GmailFullMessageListFetchService,
@InjectRepository(FeatureFlagEntity, 'core')
private readonly featureFlagRepository: Repository<FeatureFlagEntity>,
private readonly gmailFullMessageListFetchV2Service: GmailFullMessageListFetchV2Service,
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
private readonly setMessageChannelSyncStatusService: SetMessageChannelSyncStatusService,
) {}

async handle(data: GmailFullMessageListFetchJobData): Promise<void> {
const { workspaceId, connectedAccountId } = data;

this.logger.log(
`gmail full-sync for workspace ${data.workspaceId} and account ${data.connectedAccountId}`,
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId}`,
);

try {
await this.googleAPIsRefreshAccessTokenService.refreshAndSaveAccessToken(
data.workspaceId,
data.connectedAccountId,
workspaceId,
connectedAccountId,
);
} catch (e) {
this.logger.error(
`Error refreshing access token for connected account ${data.connectedAccountId} in workspace ${data.workspaceId}`,
`Error refreshing access token for connected account ${connectedAccountId} in workspace ${workspaceId}`,
e,
);

return;
}

await this.gmailFullSyncService.fetchConnectedAccountThreads(
data.workspaceId,
data.connectedAccountId,
);
const isGmailSyncV2EnabledFeatureFlag =
await this.featureFlagRepository.findOneBy({
workspaceId: workspaceId,
key: FeatureFlagKeys.IsGmailSyncV2Enabled,
value: true,
});

const isGmailSyncV2Enabled = isGmailSyncV2EnabledFeatureFlag?.value;

if (isGmailSyncV2Enabled) {
const connectedAccount = await this.connectedAccountRepository.getById(
connectedAccountId,
workspaceId,
);

if (!connectedAccount) {
throw new Error(
`Connected account ${connectedAccountId} not found in workspace ${workspaceId}`,
);
}

const messageChannel =
await this.messageChannelRepository.getFirstByConnectedAccountId(
connectedAccountId,
workspaceId,
);

if (!messageChannel) {
throw new Error(
`No message channel found for connected account ${connectedAccountId} in workspace ${workspaceId}`,
);
}

const refreshToken = connectedAccount.refreshToken;

if (!refreshToken) {
if (!connectedAccount.authFailedAt) {
await this.connectedAccountRepository.updateAuthFailedAt(
connectedAccountId,
workspaceId,
);
}

await this.setMessageChannelSyncStatusService.setFailedInsufficientPermissionsStatus(
messageChannel.id,
workspaceId,
);

throw new Error(
`No refresh token found for connected account ${connectedAccountId} in workspace ${workspaceId}`,
);
}

await this.gmailFullMessageListFetchV2Service.processMessageListFetch(
messageChannel,
connectedAccount,
workspaceId,
);
} else {
await this.gmailFullMessageListFetchService.fetchConnectedAccountThreads(
data.workspaceId,
data.connectedAccountId,
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,15 @@ import { MessageQueueJob } from 'src/engine/integrations/message-queue/interface

import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service';
import { GmailPartialMessageListFetchV2Service } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-v2.service';
import { GetConnectedAccountAndMessageChannelService } from 'src/modules/messaging/services/get-connected-account-and-message-channel/get-connected-account-and-message-channel.service';
import { MessageChannelSyncSubStatus } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
import { GmailFullMessageListFetchService } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.service';
import {
MessageChannelSyncSubStatus,
MessageChannelWorkspaceEntity,
} from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
import { GmailFullMessageListFetchV2Service } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch-v2.service';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';

export type GmailMessageListFetchJobData = {
workspaceId: string;
Expand All @@ -21,9 +27,12 @@ export class GmailMessageListFetchJob

constructor(
private readonly googleAPIsRefreshAccessTokenService: GoogleAPIRefreshAccessTokenService,
private readonly gmailFullSyncService: GmailFullMessageListFetchService,
private readonly gmailFullMessageListFetchV2Service: GmailFullMessageListFetchV2Service,
private readonly gmailPartialMessageListFetchV2Service: GmailPartialMessageListFetchV2Service,
private readonly getConnectedAccountAndMessageChannelService: GetConnectedAccountAndMessageChannelService,
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
) {}

async handle(data: GmailMessageListFetchJobData): Promise<void> {
Expand All @@ -47,12 +56,29 @@ export class GmailMessageListFetchJob
return;
}

const { messageChannel, connectedAccount } =
await this.getConnectedAccountAndMessageChannelService.getConnectedAccountAndMessageChannelOrThrow(
workspaceId,
const connectedAccount = await this.connectedAccountRepository.getById(
connectedAccountId,
workspaceId,
);

if (!connectedAccount) {
throw new Error(
`Connected account ${connectedAccountId} not found in workspace ${workspaceId}`,
);
}

const messageChannel =
await this.messageChannelRepository.getFirstByConnectedAccountId(
connectedAccountId,
workspaceId,
);

if (!messageChannel) {
throw new Error(
`No message channel found for connected account ${connectedAccountId} in workspace ${workspaceId}`,
);
}

switch (messageChannel.syncSubStatus) {
case MessageChannelSyncSubStatus.PARTIAL_MESSAGES_LIST_FETCH_PENDING:
try {
Expand All @@ -69,9 +95,10 @@ export class GmailMessageListFetchJob

case MessageChannelSyncSubStatus.FULL_MESSAGES_LIST_FETCH_PENDING:
try {
await this.gmailFullSyncService.fetchConnectedAccountThreads(
await this.gmailFullMessageListFetchV2Service.processMessageListFetch(
messageChannel,
connectedAccount,
workspaceId,
connectedAccountId,
);
} catch (e) {
this.logger.error(e);
Expand All @@ -87,7 +114,7 @@ export class GmailMessageListFetchJob
return;

default:
this.logger.error(
this.logger.log(
`Messaging import for workspace ${workspaceId} and account ${connectedAccountId} is locked, import will be retried later.`,
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import { GmailFullMessageListFetchJob } from 'src/modules/messaging/jobs/gmail-f
import { GmailMessageListFetchJob } from 'src/modules/messaging/jobs/gmail-message-list-fetch.job';
import { GmailPartialMessageListFetchJob } from 'src/modules/messaging/jobs/gmail-partial-message-list-fetch.job';
import { MessagingCreateCompanyAndContactAfterSyncJob } from 'src/modules/messaging/jobs/messaging-create-company-and-contact-after-sync.job';
import { GetConnectedAccountAndMessageChannelModule } from 'src/modules/messaging/services/get-connected-account-and-message-channel/get-connected-account-and-message-channel.module';
import { GmailFullMessageListFetchModule } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.module';
import { GmailPartialMessageListFetchModule } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.module';
import { SetMessageChannelSyncStatusModule } from 'src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.module';
import { ThreadCleanerModule } from 'src/modules/messaging/services/thread-cleaner/thread-cleaner.module';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel-message-association.workspace-entity';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
Expand All @@ -37,7 +37,7 @@ import { MessageParticipantWorkspaceEntity } from 'src/modules/messaging/standar
GoogleAPIRefreshAccessTokenModule,
AutoCompaniesAndContactsCreationModule,
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
GetConnectedAccountAndMessageChannelModule,
SetMessageChannelSyncStatusModule,
],
providers: [
{
Expand Down

This file was deleted.

Loading
Loading