diff --git a/apps/web/app/api/outlook/webhook/process-history-item.ts b/apps/web/app/api/outlook/webhook/process-history-item.ts index 0c65914611..45c9c492f4 100644 --- a/apps/web/app/api/outlook/webhook/process-history-item.ts +++ b/apps/web/app/api/outlook/webhook/process-history-item.ts @@ -101,6 +101,7 @@ export async function processHistoryItem( message: { id: messageId, threadId: resourceData.conversationId || messageId, + conversationIndex: message.conversationIndex || undefined, headers: { from, to: to.join(","), @@ -216,6 +217,7 @@ export async function processHistoryItem( message: { id: messageId, threadId: resourceData.conversationId || messageId, + conversationIndex: message.conversationIndex || undefined, headers: { from, to: to.join(","), @@ -280,6 +282,7 @@ async function handleOutbound( const parsedMessage = { id: messageId, threadId: conversationId || messageId, + conversationIndex: message.conversationIndex || undefined, headers: messageHeaders, snippet: message.bodyPreview || "", historyId: message.id || messageId, diff --git a/apps/web/app/api/scheduled-actions/execute/route.ts b/apps/web/app/api/scheduled-actions/execute/route.ts index d21ac7b00b..6499c9366c 100644 --- a/apps/web/app/api/scheduled-actions/execute/route.ts +++ b/apps/web/app/api/scheduled-actions/execute/route.ts @@ -7,6 +7,7 @@ import { markQStashActionAsExecuting } from "@/utils/scheduled-actions/scheduler import { executeScheduledAction } from "@/utils/scheduled-actions/executor"; import prisma from "@/utils/prisma"; import { ScheduledActionStatus } from "@prisma/client"; +import { createEmailProvider } from "@/utils/email/provider"; const logger = createScopedLogger("scheduled-actions-executor"); @@ -46,7 +47,11 @@ export const POST = verifySignatureAppRouter( const scheduledAction = await prisma.scheduledAction.findUnique({ where: { id: payload.scheduledActionId }, include: { - emailAccount: true, + emailAccount: { + include: { + account: true, + }, + }, executedRule: true, }, }); @@ -85,8 +90,14 @@ export const POST = verifySignatureAppRouter( return new Response("Action already being processed", { status: 200 }); } - // Execute the action - const executionResult = await executeScheduledAction(scheduledAction); + const provider = await createEmailProvider({ + emailAccountId: scheduledAction.emailAccountId, + provider: scheduledAction.emailAccount.account.provider, + }); + const executionResult = await executeScheduledAction( + scheduledAction, + provider, + ); if (executionResult.success) { logger.info("Successfully executed QStash scheduled action", { diff --git a/apps/web/utils/ai/choose-rule/match-rules.ts b/apps/web/utils/ai/choose-rule/match-rules.ts index df7a5a29fe..ed0de691c6 100644 --- a/apps/web/utils/ai/choose-rule/match-rules.ts +++ b/apps/web/utils/ai/choose-rule/match-rules.ts @@ -27,6 +27,7 @@ import { extractEmailAddress } from "@/utils/email"; import { hasIcsAttachment } from "@/utils/parse/calender-event"; import { checkSenderReplyHistory } from "@/utils/reply-tracker/check-sender-reply-history"; import type { EmailProvider } from "@/utils/email/provider"; +import { isReplyInThread } from "@/utils/thread"; const logger = createScopedLogger("match-rules"); @@ -231,7 +232,12 @@ async function findMatchingRuleWithReasons( matchReasons?: MatchReason[]; reason?: string; }> { - const isThread = message.isReplyInThread; + const isThread = isReplyInThread( + message.id, + message.threadId, + message.conversationIndex, + ); + const { match, matchReasons, potentialMatches } = await findPotentialMatchingRules({ rules, diff --git a/apps/web/utils/mail.ts b/apps/web/utils/mail.ts index b8a5cd5f20..54004e8fcf 100644 --- a/apps/web/utils/mail.ts +++ b/apps/web/utils/mail.ts @@ -19,7 +19,6 @@ export function parseMessage( ...parsed, subject: parsed.headers?.subject || "", date: parsed.headers?.date || "", - isReplyInThread: !!(parsed.id && parsed.id !== parsed.threadId), }; } diff --git a/apps/web/utils/outlook/message.ts b/apps/web/utils/outlook/message.ts index ad29794f1a..0aff0e8f91 100644 --- a/apps/web/utils/outlook/message.ts +++ b/apps/web/utils/outlook/message.ts @@ -9,7 +9,7 @@ const logger = createScopedLogger("outlook/message"); // Cache for folder IDs let folderIdCache: Record | null = null; -function isOutlookReplyInThread( +export function isOutlookReplyInThread( conversationIndex?: string | undefined, ): boolean { try { @@ -287,9 +287,7 @@ async function convertMessages( return { id: message.id || "", threadId: message.conversationId || "", - isReplyInThread: isOutlookReplyInThread( - message.conversationIndex || "", - ), + conversationIndex: message.conversationIndex || undefined, snippet: message.bodyPreview || "", textPlain: message.body?.content || "", textHtml: message.body?.content || "", @@ -327,7 +325,7 @@ export async function getMessage( return { id: message.id || "", threadId: message.conversationId || "", - isReplyInThread: isOutlookReplyInThread(message.conversationIndex || ""), + conversationIndex: message.conversationIndex || "", snippet: message.bodyPreview || "", textPlain: message.body?.content || "", textHtml: message.body?.content || "", @@ -386,7 +384,7 @@ function parseOutlookMessage( return { id: message.id || "", threadId: message.conversationId || "", - isReplyInThread: atob(message.conversationIndex || "").length > 22, + conversationIndex: message.conversationIndex || "", snippet: message.bodyPreview || "", textPlain: message.body?.content || "", headers: { diff --git a/apps/web/utils/scheduled-actions/executor.ts b/apps/web/utils/scheduled-actions/executor.ts index f3bb5557cd..0852674fc7 100644 --- a/apps/web/utils/scheduled-actions/executor.ts +++ b/apps/web/utils/scheduled-actions/executor.ts @@ -14,13 +14,18 @@ import type { ActionItem, EmailForAction } from "@/utils/ai/types"; import type { ParsedMessage } from "@/utils/types"; import { getMessage } from "@/utils/gmail/message"; import { parseMessage } from "@/utils/mail"; +import { createEmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/provider"; const logger = createScopedLogger("scheduled-actions-executor"); /** * Execute a scheduled action */ -export async function executeScheduledAction(scheduledAction: ScheduledAction) { +export async function executeScheduledAction( + scheduledAction: ScheduledAction, + client: EmailProvider, +) { logger.info("Executing scheduled action", { scheduledActionId: scheduledAction.id, actionType: scheduledAction.actionType, @@ -37,16 +42,8 @@ export async function executeScheduledAction(scheduledAction: ScheduledAction) { throw new Error("Email account not found"); } - // Get Gmail client - const gmail = await getGmailClientWithRefresh({ - accessToken: emailAccount.tokens.access_token, - refreshToken: emailAccount.tokens.refresh_token, - expiresAt: emailAccount.tokens.expires_at, - emailAccountId: emailAccount.id, - }); - // Validate email still exists and get current state - const emailMessage = await validateEmailState(gmail, scheduledAction); + const emailMessage = await validateEmailState(client, scheduledAction); if (!emailMessage) { await markActionCompleted( scheduledAction.id, @@ -71,7 +68,7 @@ export async function executeScheduledAction(scheduledAction: ScheduledAction) { // Execute the action const executedAction = await executeDelayedAction({ - gmail, + client, actionItem, emailMessage, emailAccount: { @@ -108,11 +105,11 @@ export async function executeScheduledAction(scheduledAction: ScheduledAction) { * Validate that the email still exists and return current state */ async function validateEmailState( - gmail: gmail_v1.Gmail, + client: EmailProvider, scheduledAction: ScheduledAction, ): Promise { try { - const message = await getMessage(scheduledAction.messageId, gmail, "full"); + const message = await client.getMessage(scheduledAction.messageId); if (!message) { logger.info("Email no longer exists", { @@ -122,23 +119,18 @@ async function validateEmailState( return null; } - // Parse the message to get the correct format - const parsedMessage = parseMessage(message); - - // Convert to EmailForAction format const emailForAction: EmailForAction = { - threadId: parsedMessage.threadId, - id: parsedMessage.id, - headers: parsedMessage.headers, - textPlain: parsedMessage.textPlain || "", - textHtml: parsedMessage.textHtml || "", - attachments: parsedMessage.attachments || [], - internalDate: parsedMessage.internalDate, + threadId: message.threadId, + id: message.id, + headers: message.headers, + textPlain: message.textPlain || "", + textHtml: message.textHtml || "", + attachments: message.attachments || [], + internalDate: message.internalDate, }; return emailForAction; } catch (error: unknown) { - // Check for Gmail's standard "not found" error message if ( error instanceof Error && error.message === "Requested entity was not found." @@ -158,13 +150,13 @@ async function validateEmailState( * Execute the delayed action using existing action execution logic */ async function executeDelayedAction({ - gmail, + client, actionItem, emailMessage, emailAccount, scheduledAction, }: { - gmail: gmail_v1.Gmail; + client: EmailProvider; actionItem: ActionItem; emailMessage: EmailForAction; emailAccount: { email: string; userId: string; id: string }; @@ -210,6 +202,8 @@ async function executeDelayedAction({ snippet: "", historyId: "", inline: [], + subject: emailMessage.headers.subject || "", + date: emailMessage.headers.date || new Date().toISOString(), }; logger.info("Executing delayed action", { @@ -220,7 +214,7 @@ async function executeDelayedAction({ // Execute the specific action directly using runActionFunction await runActionFunction({ - gmail, + client, email: parsedMessage, action: executedAction, userEmail: emailAccount.email, diff --git a/apps/web/utils/thread.ts b/apps/web/utils/thread.ts new file mode 100644 index 0000000000..5a8e4ab18e --- /dev/null +++ b/apps/web/utils/thread.ts @@ -0,0 +1,14 @@ +import { isOutlookReplyInThread } from "@/utils/outlook/message"; + +// The first message id in a thread is the threadId +export function isReplyInThread( + messageId: string, + threadId: string, + conversationIndex: string | undefined, +): boolean { + if (conversationIndex) { + return isOutlookReplyInThread(conversationIndex); + } else { + return !!(messageId && messageId !== threadId); + } +} diff --git a/apps/web/utils/types.ts b/apps/web/utils/types.ts index ffce5f1b49..0e8f716785 100644 --- a/apps/web/utils/types.ts +++ b/apps/web/utils/types.ts @@ -47,7 +47,7 @@ export type ThreadWithPayloadMessages = gmail_v1.Schema$Thread & { export interface ParsedMessage extends gmail_v1.Schema$Message { id: string; threadId: string; - isReplyInThread: boolean; + conversationIndex?: string; labelIds?: string[]; snippet: string; historyId: string;