Skip to content

Commit

Permalink
5531 update gmail full sync to v2 (twentyhq#5674)
Browse files Browse the repository at this point in the history
Closes twentyhq#5531

---------

Co-authored-by: Charles Bochet <[email protected]>
  • Loading branch information
bosiraphael and charlesBochet authored May 31, 2024
1 parent 7f27230 commit 50997a4
Show file tree
Hide file tree
Showing 39 changed files with 1,060 additions and 579 deletions.
2 changes: 2 additions & 0 deletions packages/twenty-docker/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ STORAGE_TYPE=local
# STORAGE_S3_REGION=eu-west3
# STORAGE_S3_NAME=my-bucket
# STORAGE_S3_ENDPOINT=

MESSAGE_QUEUE_TYPE=pg-boss
28 changes: 28 additions & 0 deletions packages/twenty-docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ services:
PG_DATABASE_URL: postgres://twenty:twenty@${PG_DATABASE_HOST}/default
SERVER_URL: ${SERVER_URL}
FRONT_BASE_URL: ${FRONT_BASE_URL:-$SERVER_URL}
MESSAGE_QUEUE_TYPE: ${MESSAGE_QUEUE_TYPE}

ENABLE_DB_MIGRATIONS: "true"

Expand All @@ -35,6 +36,32 @@ services:
retries: 10
restart: always

worker:
image: twentycrm/twenty:${TAG}
volumes:
- worker-local-data:/app/${STORAGE_LOCAL_PATH:-.local-storage}
command: ["yarn", "worker:prod"]
environment:
PG_DATABASE_URL: postgres://twenty:twenty@${PG_DATABASE_HOST}/default
SERVER_URL: ${SERVER_URL}
FRONT_BASE_URL: ${FRONT_BASE_URL:-$SERVER_URL}
MESSAGE_QUEUE_TYPE: ${MESSAGE_QUEUE_TYPE}

ENABLE_DB_MIGRATIONS: "true"

STORAGE_TYPE: ${STORAGE_TYPE}
STORAGE_S3_REGION: ${STORAGE_S3_REGION}
STORAGE_S3_NAME: ${STORAGE_S3_NAME}
STORAGE_S3_ENDPOINT: ${STORAGE_S3_ENDPOINT}
ACCESS_TOKEN_SECRET: ${ACCESS_TOKEN_SECRET}
LOGIN_TOKEN_SECRET: ${LOGIN_TOKEN_SECRET}
REFRESH_TOKEN_SECRET: ${REFRESH_TOKEN_SECRET}
FILE_TOKEN_SECRET: ${FILE_TOKEN_SECRET}
depends_on:
db:
condition: service_healthy
restart: always

db:
image: twentycrm/twenty-postgres:${TAG}
volumes:
Expand All @@ -51,3 +78,4 @@ services:
volumes:
db-data:
server-local-data:
worker-local-data:
10 changes: 10 additions & 0 deletions packages/twenty-docs/docs/start/self-hosting/self-hosting.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ import TabItem from '@theme/TabItem';

<DocCardList/>

# Setup Messaging & Calendar sync

Twenty offers integrations with Gmail and Google Calendar. To enable these features, you need to connect to register the following recurring jobs:
```
# from your worker container
yarn command:prod cron:messaging:gmail-messages-import
yarn command:prod cron:messaging:gmail-message-list-fetch
```

# Setup Environment Variables

## Frontend
Expand Down Expand Up @@ -209,3 +218,4 @@ import TabItem from '@theme/TabItem';
['CAPTCHA_SITE_KEY', '', 'The captcha site key'],
['CAPTCHA_SECRET_KEY', '', 'The captcha secret key'],
]}></OptionTable>

Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export const seedMessageChannel = async (
handle: '[email protected]',
visibility: 'share_everything',
syncSubStatus:
MessageChannelSyncSubStatus.FULL_MESSAGES_LIST_FETCH_PENDING,
MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING,
},
{
id: DEV_SEED_MESSAGE_CHANNEL_IDS.JONY,
Expand All @@ -56,7 +56,7 @@ export const seedMessageChannel = async (
handle: '[email protected]',
visibility: 'share_everything',
syncSubStatus:
MessageChannelSyncSubStatus.FULL_MESSAGES_LIST_FETCH_PENDING,
MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING,
},
{
id: DEV_SEED_MESSAGE_CHANNEL_IDS.PHIL,
Expand All @@ -69,7 +69,7 @@ export const seedMessageChannel = async (
handle: '[email protected]',
visibility: 'share_everything',
syncSubStatus:
MessageChannelSyncSubStatus.FULL_MESSAGES_LIST_FETCH_PENDING,
MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING,
},
])
.execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';

import { AnalyticsService } from 'src/engine/core-modules/analytics/analytics.service';
import { CreateAnalyticsInput } from 'src/engine/core-modules/analytics/dto/create-analytics.input';
import { ObjectRecordCreateEvent } from 'src/engine/integrations/event-emitter/types/object-record-create.event';

@Injectable()
Expand All @@ -11,14 +10,13 @@ export class TelemetryListener {

@OnEvent('*.created')
async handleAllCreate(payload: ObjectRecordCreateEvent<any>) {
this.analyticsService.create(
await this.analyticsService.create(
{
type: 'track',
name: payload.name,
data: JSON.parse(`{
"eventName": "${payload.name}"
}`),
} as CreateAnalyticsInput,
data: {
eventName: payload.name,
},
},
payload.userId,
payload.workspaceId,
'', // voluntarely not retrieving this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { User } from 'src/engine/core-modules/user/user.entity';
import { AnalyticsService } from './analytics.service';
import { Analytics } from './analytics.entity';

import { CreateAnalyticsInput } from './dto/create-analytics.input';
import { CreateAnalyticsInput } from './dtos/create-analytics.input';

@UseGuards(OptionalJwtAuthGuard)
@Resolver(() => Analytics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import { HttpService } from '@nestjs/axios';
import { anonymize } from 'src/utils/anonymize';
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service';

import { CreateAnalyticsInput } from './dto/create-analytics.input';
type CreateEventInput = {
type: string;
data: object;
};

@Injectable()
export class AnalyticsService {
Expand All @@ -16,7 +19,7 @@ export class AnalyticsService {
) {}

async create(
createEventInput: CreateAnalyticsInput,
createEventInput: CreateEventInput,
userId: string | undefined,
workspaceId: string | undefined,
workspaceDisplayName: string | undefined,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,14 +270,6 @@ export class ConnectedAccountRepository {
);
}

const refreshToken = connectedAccount.refreshToken;

if (!refreshToken) {
throw new Error(
`No refresh token found for connected account ${connectedAccountId} in workspace ${workspaceId}`,
);
}

return connectedAccount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ import { Module } from '@nestjs/common';
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { GmailErrorHandlingModule } from 'src/modules/messaging/services/gmail-error-handling/gmail-error-handling.module';

@Module({
imports: [
ObjectMetadataRepositoryModule.forFeature([
ConnectedAccountWorkspaceEntity,
]),
GmailErrorHandlingModule,
],
providers: [GoogleAPIRefreshAccessTokenService],
exports: [GoogleAPIRefreshAccessTokenService],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,19 @@ import { EnvironmentService } from 'src/engine/integrations/environment/environm
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
import { GmailErrorHandlingService } from 'src/modules/messaging/services/gmail-error-handling/gmail-error-handling.service';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';

@Injectable()
export class GoogleAPIRefreshAccessTokenService {
constructor(
private readonly environmentService: EnvironmentService,
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
private readonly gmailErrorHandlingService: GmailErrorHandlingService,
) {}

async refreshAndSaveAccessToken(
Expand All @@ -30,12 +36,6 @@ export class GoogleAPIRefreshAccessTokenService {
);
}

if (connectedAccount.authFailedAt) {
throw new Error(
`Skipping refresh of access token for connected account ${connectedAccountId} in workspace ${workspaceId} because auth already failed, a new refresh token is needed`,
);
}

const refreshToken = connectedAccount.refreshToken;

if (!refreshToken) {
Expand All @@ -44,50 +44,55 @@ export class GoogleAPIRefreshAccessTokenService {
);
}

const accessToken = await this.refreshAccessToken(
refreshToken,
connectedAccountId,
workspaceId,
);

await this.connectedAccountRepository.updateAccessToken(
accessToken,
connectedAccountId,
workspaceId,
);
}

async refreshAccessToken(
refreshToken: string,
connectedAccountId: string,
workspaceId: string,
): Promise<string> {
try {
const response = await axios.post(
'https://oauth2.googleapis.com/token',
{
client_id: this.environmentService.get('AUTH_GOOGLE_CLIENT_ID'),
client_secret: this.environmentService.get(
'AUTH_GOOGLE_CLIENT_SECRET',
),
refresh_token: refreshToken,
grant_type: 'refresh_token',
},
{
headers: {
'Content-Type': 'application/json',
},
},
);
const accessToken = await this.refreshAccessToken(refreshToken);

return response.data.access_token;
} catch (error) {
await this.connectedAccountRepository.updateAuthFailedAt(
await this.connectedAccountRepository.updateAccessToken(
accessToken,
connectedAccountId,
workspaceId,
);
} catch (error) {
const messageChannel =
await this.messageChannelRepository.getFirstByConnectedAccountId(
connectedAccountId,
workspaceId,
);

throw new Error(`Error refreshing access token: ${error.message}`);
if (!messageChannel) {
throw new Error(
`No message channel found for connected account ${connectedAccountId} in workspace ${workspaceId}`,
);
}

await this.gmailErrorHandlingService.handleGmailError(
{
code: error.code,
reason: error.response.data.error,
},
'messages-import',
messageChannel,
workspaceId,
);
}
}

async refreshAccessToken(refreshToken: string): Promise<string> {
const response = await axios.post(
'https://oauth2.googleapis.com/token',
{
client_id: this.environmentService.get('AUTH_GOOGLE_CLIENT_ID'),
client_secret: this.environmentService.get('AUTH_GOOGLE_CLIENT_SECRET'),
refresh_token: refreshToken,
grant_type: 'refresh_token',
},
{
headers: {
'Content-Type': 'application/json',
},
},
);

return response.data.access_token;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { MessageQueue } from 'src/engine/integrations/message-queue/message-queu
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { GmailMessageListFetchCronJob } from 'src/modules/messaging/crons/jobs/gmail-message-list-fetch.cron.job';

const GMAIL_PARTIAL_SYNC_CRON_PATTERN = '*/5 * * * *';
const GMAIL_MESSAGE_LIST_FETCH_CRON_PATTERN = '*/5 * * * *';

@Command({
name: 'cron:messaging:gmail-message-list-fetch',
Expand All @@ -26,7 +26,7 @@ export class GmailMessageListFetchCronCommand extends CommandRunner {
GmailMessageListFetchCronJob.name,
undefined,
{
repeat: { pattern: GMAIL_PARTIAL_SYNC_CRON_PATTERN },
repeat: { pattern: GMAIL_MESSAGE_LIST_FETCH_CRON_PATTERN },
},
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,6 @@ export class GmailMessageListFetchCronJob
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
},
{
retryLimit: 2,
},
);
} else {
await this.messageQueueService.add<GmailPartialMessageListFetchJobData>(
Expand All @@ -110,9 +107,6 @@ export class GmailMessageListFetchCronJob
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
},
{
retryLimit: 2,
},
);
}
}
Expand Down
Loading

0 comments on commit 50997a4

Please sign in to comment.