Skip to content

Commit

Permalink
Merge messages and threads #1 (#3583)
Browse files Browse the repository at this point in the history
* Merge messages and threads

* rename messageChannelSync to messageChannelMessage

* add merge logic

* remove deprecated methods

* restore enqueue GmailFullSyncJob after connectedAccount creation
  • Loading branch information
Weiko authored Jan 23, 2024
1 parent 23a3614 commit dc7fccb
Show file tree
Hide file tree
Showing 13 changed files with 291 additions and 337 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,22 @@ export const Threads = ({ entity }: { entity: ActivityTargetableObject }) => {
title={
<>
Inbox{' '}
<StyledEmailCount>{timelineThreads.length}</StyledEmailCount>
<StyledEmailCount>
{timelineThreads && timelineThreads.length}
</StyledEmailCount>
</>
}
fontColor={H1TitleFontColor.Primary}
/>
<Card>
{timelineThreads.map((thread: TimelineThread, index: number) => (
<ThreadPreview
key={index}
divider={index < timelineThreads.length - 1}
thread={thread}
/>
))}
{timelineThreads &&
timelineThreads.map((thread: TimelineThread, index: number) => (
<ThreadPreview
key={index}
divider={index < timelineThreads.length - 1}
thread={thread}
/>
))}
</Card>
</Section>
</StyledContainer>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ export class GoogleGmailService {
private readonly messageQueueService: MessageQueueService,
) {}

providerName = 'google';

async saveConnectedAccount(
saveConnectedAccountInput: SaveConnectedAccountInput,
) {
const {
handle,
workspaceId,
provider,
accessToken,
refreshToken,
workspaceMemberId,
Expand All @@ -43,7 +44,7 @@ export class GoogleGmailService {

const connectedAccount = await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "handle" = $1 AND "provider" = $2 AND "accountOwnerId" = $3`,
[handle, provider, workspaceMemberId],
[handle, this.providerName, workspaceMemberId],
);

if (connectedAccount.length > 0) {
Expand All @@ -60,7 +61,7 @@ export class GoogleGmailService {
[
connectedAccountId,
handle,
provider,
this.providerName,
accessToken,
refreshToken,
workspaceMemberId,
Expand All @@ -69,7 +70,7 @@ export class GoogleGmailService {

await manager.query(
`INSERT INTO ${dataSourceMetadata.schema}."messageChannel" ("visibility", "handle", "connectedAccountId", "type") VALUES ($1, $2, $3, $4)`,
['share_everything', handle, connectedAccountId, 'gmail'],
['share_everything', handle, connectedAccountId, 'email'],
);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { TimelineMessagingService } from 'src/core/messaging/timeline-messaging.

@Entity({ name: 'timelineThread', schema: 'core' })
@ObjectType('TimelineThread')
class TimelineThread {
export class TimelineThread {
@Field()
@Column()
read: boolean;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Injectable } from '@nestjs/common';

import { TimelineThread } from 'src/core/messaging/timeline-messaging.resolver';
import { TypeORMService } from 'src/database/typeorm/typeorm.service';
import { DataSourceService } from 'src/metadata/data-source/data-source.service';

Expand All @@ -10,7 +11,10 @@ export class TimelineMessagingService {
private readonly typeORMService: TypeORMService,
) {}

async getMessagesFromPersonIds(workspaceId: string, personIds: string[]) {
async getMessagesFromPersonIds(
workspaceId: string,
personIds: string[],
): Promise<TimelineThread[]> {
const dataSourceMetadata =
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail(
workspaceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,19 @@ export class GmailFullSyncService {
throw new Error('No refresh token found');
}

const gmailMessageChannel = await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."messageChannel" WHERE "connectedAccountId" = $1 AND "type" = 'email' LIMIT 1`,
[connectedAccountId],
);

if (!gmailMessageChannel.length) {
throw new Error(
`No gmail message channel found for connected account ${connectedAccountId}`,
);
}

const gmailMessageChannelId = gmailMessageChannel[0].id;

const gmailClient =
await this.gmailClientProvider.getGmailClient(refreshToken);

Expand All @@ -48,53 +61,29 @@ export class GmailFullSyncService {
return;
}

const { savedMessageIds, savedThreadIds } =
await this.utils.getSavedMessageIdsAndThreadIds(
messageExternalIds,
connectedAccountId,
dataSourceMetadata,
workspaceDataSource,
);

const messageIdsToSave = messageExternalIds.filter(
(messageId) => !savedMessageIds.includes(messageId),
);

const messageQueries =
this.utils.createQueriesFromMessageIds(messageIdsToSave);
this.utils.createQueriesFromMessageIds(messageExternalIds);

const { messages: messagesToSave, errors } =
await this.fetchMessagesByBatchesService.fetchAllMessages(
messageQueries,
accessToken,
);

const threads = this.utils.getThreadsFromMessages(messagesToSave);

const threadsToSave = threads.filter(
(threadId) => !savedThreadIds.includes(threadId.id),
);

await this.utils.saveMessageThreads(
threadsToSave,
dataSourceMetadata,
workspaceDataSource,
connectedAccount.id,
);
if (messagesToSave.length === 0) {
return;
}

await this.utils.saveMessages(
messagesToSave,
dataSourceMetadata,
workspaceDataSource,
connectedAccount,
gmailMessageChannelId,
);

if (errors.length) throw new Error('Error fetching messages');

if (messagesToSave.length === 0) {
return;
}

const lastModifiedMessageId = messagesData[0].id;

const historyId = messagesToSave.find(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,76 +111,42 @@ export class GmailPartialSyncService {
return;
}

const { messagesAdded, messagesDeleted } =
await this.getMessageIdsAndThreadIdsFromHistory(history);

const {
savedMessageIds: messagesAddedAlreadySaved,
savedThreadIds: threadsAddedAlreadySaved,
} = await this.utils.getSavedMessageIdsAndThreadIds(
messagesAdded,
connectedAccountId,
dataSourceMetadata,
workspaceDataSource,
);

const messageExternalIdsToSave = messagesAdded.filter(
(messageId) =>
!messagesAddedAlreadySaved.includes(messageId) &&
!messagesDeleted.includes(messageId),
const gmailMessageChannel = await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."messageChannel" WHERE "connectedAccountId" = $1 AND "type" = 'email' LIMIT 1`,
[connectedAccountId],
);

const { savedMessageIds: messagesDeletedAlreadySaved } =
await this.utils.getSavedMessageIdsAndThreadIds(
messagesDeleted,
connectedAccountId,
dataSourceMetadata,
workspaceDataSource,
if (!gmailMessageChannel.length) {
throw new Error(
`No gmail message channel found for connected account ${connectedAccountId}`,
);
}

const messageExternalIdsToDelete = messagesDeleted.filter((messageId) =>
messagesDeletedAlreadySaved.includes(messageId),
);
const gmailMessageChannelId = gmailMessageChannel[0].id;

const messageQueries = this.utils.createQueriesFromMessageIds(
messageExternalIdsToSave,
);
const { messagesAdded, messagesDeleted } =
await this.getMessageIdsAndThreadIdsFromHistory(history);

const messageQueries =
this.utils.createQueriesFromMessageIds(messagesAdded);

const { messages: messagesToSave, errors } =
await this.fetchMessagesByBatchesService.fetchAllMessages(
messageQueries,
accessToken,
);

const threads = this.utils.getThreadsFromMessages(messagesToSave);

const threadsToSave = threads.filter(
(thread) => !threadsAddedAlreadySaved.includes(thread.id),
);

await this.utils.saveMessageThreads(
threadsToSave,
dataSourceMetadata,
workspaceDataSource,
connectedAccount.id,
);

await this.utils.saveMessages(
messagesToSave,
dataSourceMetadata,
workspaceDataSource,
connectedAccount,
gmailMessageChannelId,
);

await this.utils.deleteMessages(
messageExternalIdsToDelete,
dataSourceMetadata,
workspaceDataSource,
);

await this.utils.deleteEmptyThreads(
await this.utils.deleteMessageChannelMessages(
messagesDeleted,
connectedAccountId,
gmailMessageChannelId,
dataSourceMetadata,
workspaceDataSource,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export class GmailRefreshAccessTokenService {
}

const connectedAccounts = await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'gmail' AND "id" = $1`,
`SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'google' AND "id" = $1`,
[connectedAccountId],
);

Expand Down
Loading

0 comments on commit dc7fccb

Please sign in to comment.