diff --git a/apps/web/__tests__/ai/choose-rule/draft-management.test.ts b/apps/web/__tests__/ai/choose-rule/draft-management.test.ts index 6ce22deae8..b3357e73cb 100644 --- a/apps/web/__tests__/ai/choose-rule/draft-management.test.ts +++ b/apps/web/__tests__/ai/choose-rule/draft-management.test.ts @@ -4,7 +4,7 @@ import prisma from "@/utils/prisma"; import { ActionType } from "@prisma/client"; import { createScopedLogger } from "@/utils/logger"; import type { ParsedMessage } from "@/utils/types"; -import type { EmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; vi.mock("@/utils/prisma", () => ({ default: { diff --git a/apps/web/__tests__/ai/reply/reply-context-collector.test.ts b/apps/web/__tests__/ai/reply/reply-context-collector.test.ts index ad172866ba..2c945f8395 100644 --- a/apps/web/__tests__/ai/reply/reply-context-collector.test.ts +++ b/apps/web/__tests__/ai/reply/reply-context-collector.test.ts @@ -1,7 +1,7 @@ -import { describe, expect, test, vi } from "vitest"; +import { afterEach, describe, expect, test, vi } from "vitest"; import { aiCollectReplyContext } from "@/utils/ai/reply/reply-context-collector"; import type { EmailForLLM, ParsedMessage } from "@/utils/types"; -import type { EmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; import { getEmailAccount } from "@/__tests__/helpers"; // Run with: pnpm test-ai reply-context-collector @@ -12,6 +12,10 @@ const isAiTest = process.env.RUN_AI_TESTS === "true"; const TEST_TIMEOUT = 60_000; describe.runIf(isAiTest)("aiCollectReplyContext", () => { + afterEach(() => { + vi.clearAllMocks(); + }); + test( "collects historical context and returns relevant emails", async () => { diff --git a/apps/web/app/api/messages/batch/route.ts b/apps/web/app/api/messages/batch/route.ts index 882035adf8..4b7e0d6621 100644 --- a/apps/web/app/api/messages/batch/route.ts +++ b/apps/web/app/api/messages/batch/route.ts @@ -2,7 +2,7 @@ import { NextResponse } from "next/server"; import { withEmailProvider } from "@/utils/middleware"; import { messagesBatchQuery } from "@/app/api/messages/validation"; import { parseReply } from "@/utils/mail"; -import type { EmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; export type MessagesBatchResponse = { messages: Awaited>; diff --git a/apps/web/app/api/messages/route.ts b/apps/web/app/api/messages/route.ts index 8f86d21687..0eff6b80a3 100644 --- a/apps/web/app/api/messages/route.ts +++ b/apps/web/app/api/messages/route.ts @@ -4,7 +4,7 @@ import { messageQuerySchema } from "@/app/api/messages/validation"; import { createScopedLogger } from "@/utils/logger"; import { isAssistantEmail } from "@/utils/assistant/is-assistant-email"; import { GmailLabel } from "@/utils/gmail/label"; -import type { EmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; const logger = createScopedLogger("api/messages"); diff --git a/apps/web/app/api/outlook/webhook/process-history-item.ts b/apps/web/app/api/outlook/webhook/process-history-item.ts index fe88da9f7e..9ea6c2a451 100644 --- a/apps/web/app/api/outlook/webhook/process-history-item.ts +++ b/apps/web/app/api/outlook/webhook/process-history-item.ts @@ -21,7 +21,7 @@ import { } from "@/utils/reply-tracker/draft-tracking"; import { formatError } from "@/utils/error"; import { createEmailProvider } from "@/utils/email/provider"; -import type { EmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; export async function processHistoryItem( resourceData: OutlookResourceData, diff --git a/apps/web/app/api/threads/[id]/route.ts b/apps/web/app/api/threads/[id]/route.ts index da998c4c4b..4e4099e649 100644 --- a/apps/web/app/api/threads/[id]/route.ts +++ b/apps/web/app/api/threads/[id]/route.ts @@ -2,7 +2,7 @@ import { z } from "zod"; import { NextResponse } from "next/server"; import { withEmailProvider } from "@/utils/middleware"; import { createScopedLogger } from "@/utils/logger"; -import type { EmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; const threadQuery = z.object({ id: z.string() }); export type ThreadQuery = z.infer; diff --git a/apps/web/app/api/threads/route.ts b/apps/web/app/api/threads/route.ts index dd7b8b9b79..81976f2239 100644 --- a/apps/web/app/api/threads/route.ts +++ b/apps/web/app/api/threads/route.ts @@ -7,7 +7,7 @@ import { getCategory } from "@/utils/redis/category"; import { ExecutedRuleStatus } from "@prisma/client"; import { createScopedLogger } from "@/utils/logger"; import { isIgnoredSender } from "@/utils/filter-ignored-senders"; -import type { EmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; const logger = createScopedLogger("api/threads"); diff --git a/apps/web/app/api/user/stats/newsletters/helpers.ts b/apps/web/app/api/user/stats/newsletters/helpers.ts index eb40baae9a..6dcb2d60e4 100644 --- a/apps/web/app/api/user/stats/newsletters/helpers.ts +++ b/apps/web/app/api/user/stats/newsletters/helpers.ts @@ -1,4 +1,4 @@ -import type { EmailProvider, EmailFilter } from "@/utils/email/provider"; +import type { EmailProvider, EmailFilter } from "@/utils/email/types"; import { extractEmailAddress } from "@/utils/email"; import prisma from "@/utils/prisma"; import { NewsletterStatus } from "@prisma/client"; diff --git a/apps/web/app/api/user/stats/newsletters/route.ts b/apps/web/app/api/user/stats/newsletters/route.ts index aa27dbc5d9..9193af0619 100644 --- a/apps/web/app/api/user/stats/newsletters/route.ts +++ b/apps/web/app/api/user/stats/newsletters/route.ts @@ -5,7 +5,7 @@ import { createScopedLogger } from "@/utils/logger"; import prisma from "@/utils/prisma"; import { Prisma } from "@prisma/client"; import { z } from "zod"; -import type { EmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; import { getAutoArchiveFilters, findNewsletterStatus, diff --git a/apps/web/app/api/watch/controller.ts b/apps/web/app/api/watch/controller.ts index 30299f3ab4..ea707bf6eb 100644 --- a/apps/web/app/api/watch/controller.ts +++ b/apps/web/app/api/watch/controller.ts @@ -1,7 +1,7 @@ import prisma from "@/utils/prisma"; import { captureException } from "@/utils/error"; import { createScopedLogger } from "@/utils/logger"; -import type { EmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; const logger = createScopedLogger("watch/controller"); diff --git a/apps/web/utils/ai/actions.ts b/apps/web/utils/ai/actions.ts index 1fa02884f5..5d4bae80d8 100644 --- a/apps/web/utils/ai/actions.ts +++ b/apps/web/utils/ai/actions.ts @@ -4,7 +4,7 @@ import { callWebhook } from "@/utils/webhook"; import type { ActionItem, EmailForAction } from "@/utils/ai/types"; import { coordinateReplyProcess } from "@/utils/reply-tracker/inbound"; import { internalDateToDate } from "@/utils/date"; -import type { EmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; import { enqueueDigestItem } from "@/utils/digest/index"; const logger = createScopedLogger("ai-actions"); diff --git a/apps/web/utils/ai/choose-rule/choose-args.ts b/apps/web/utils/ai/choose-rule/choose-args.ts index d3d783018c..38e069edee 100644 --- a/apps/web/utils/ai/choose-rule/choose-args.ts +++ b/apps/web/utils/ai/choose-rule/choose-args.ts @@ -11,7 +11,7 @@ import { fetchMessagesAndGenerateDraft } from "@/utils/reply-tracker/generate-dr import { getEmailForLLM } from "@/utils/get-email-from-message"; import { aiGenerateArgs } from "@/utils/ai/choose-rule/ai-choose-args"; import { createScopedLogger } from "@/utils/logger"; -import type { EmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; const logger = createScopedLogger("choose-args"); @@ -46,13 +46,27 @@ export async function getActionItemsWithAiArgs({ if (draftEmailActions.length) { try { + logger.info("Generating draft", { + email: emailAccount.email, + threadId: message.threadId, + }); + draft = await fetchMessagesAndGenerateDraft( emailAccount, message.threadId, client, ); + + logger.info("Draft generated", { + email: emailAccount.email, + threadId: message.threadId, + }); } catch (error) { - logger.error("Failed to generate draft", { error }); + logger.error("Failed to generate draft", { + email: emailAccount.email, + threadId: message.threadId, + error, + }); // Continue without draft if generation fails draft = null; } diff --git a/apps/web/utils/ai/choose-rule/draft-management.ts b/apps/web/utils/ai/choose-rule/draft-management.ts index 35e9818bf4..11ee11b575 100644 --- a/apps/web/utils/ai/choose-rule/draft-management.ts +++ b/apps/web/utils/ai/choose-rule/draft-management.ts @@ -2,7 +2,7 @@ import prisma from "@/utils/prisma"; import { ActionType } from "@prisma/client"; import type { ExecutedRule } from "@prisma/client"; import type { Logger } from "@/utils/logger"; -import type { EmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; /** * Handles finding and potentially deleting a previous AI-generated draft for a thread. diff --git a/apps/web/utils/ai/choose-rule/execute.ts b/apps/web/utils/ai/choose-rule/execute.ts index e8217e24ab..c34b1c7553 100644 --- a/apps/web/utils/ai/choose-rule/execute.ts +++ b/apps/web/utils/ai/choose-rule/execute.ts @@ -5,7 +5,7 @@ import { ExecutedRuleStatus, ActionType } from "@prisma/client"; import { createScopedLogger } from "@/utils/logger"; import type { ParsedMessage } from "@/utils/types"; import { updateExecutedActionWithDraftId } from "@/utils/ai/choose-rule/draft-management"; -import type { EmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; type ExecutedRuleWithActionItems = Prisma.ExecutedRuleGetPayload<{ include: { actionItems: true }; diff --git a/apps/web/utils/ai/choose-rule/match-rules.test.ts b/apps/web/utils/ai/choose-rule/match-rules.test.ts index 0554222ba3..586a7d0157 100644 --- a/apps/web/utils/ai/choose-rule/match-rules.test.ts +++ b/apps/web/utils/ai/choose-rule/match-rules.test.ts @@ -19,7 +19,7 @@ import type { ParsedMessage, ParsedMessageHeaders, } from "@/utils/types"; -import type { EmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; import prisma from "@/utils/__mocks__/prisma"; import { aiChooseRule } from "@/utils/ai/choose-rule/ai-choose-rule"; import { getEmailAccount } from "@/__tests__/helpers"; diff --git a/apps/web/utils/ai/choose-rule/match-rules.ts b/apps/web/utils/ai/choose-rule/match-rules.ts index 0eb87b1350..bec5ec2a2d 100644 --- a/apps/web/utils/ai/choose-rule/match-rules.ts +++ b/apps/web/utils/ai/choose-rule/match-rules.ts @@ -26,7 +26,7 @@ import type { import { extractEmailAddress } from "@/utils/email"; import { hasIcsAttachment } from "@/utils/parse/calender-event"; import { checkSenderReplyHistory } from "@/utils/reply-tracker/check-sender-reply-history"; -import type { EmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; import type { ModelType } from "@/utils/llms/model"; const logger = createScopedLogger("match-rules"); diff --git a/apps/web/utils/ai/choose-rule/run-rules.ts b/apps/web/utils/ai/choose-rule/run-rules.ts index 6c765ea202..0e584de5b4 100644 --- a/apps/web/utils/ai/choose-rule/run-rules.ts +++ b/apps/web/utils/ai/choose-rule/run-rules.ts @@ -21,7 +21,7 @@ import { cancelScheduledActions, } from "@/utils/scheduled-actions/scheduler"; import groupBy from "lodash/groupBy"; -import type { EmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; import type { ModelType } from "@/utils/llms/model"; const logger = createScopedLogger("ai-run-rules"); diff --git a/apps/web/utils/ai/reply/reply-context-collector.ts b/apps/web/utils/ai/reply/reply-context-collector.ts index 0f3b509d77..09e928adf6 100644 --- a/apps/web/utils/ai/reply/reply-context-collector.ts +++ b/apps/web/utils/ai/reply/reply-context-collector.ts @@ -8,7 +8,7 @@ import type { EmailForLLM } from "@/utils/types"; import { stringifyEmail } from "@/utils/stringify-email"; import { getTodayForLLM } from "@/utils/llms/helpers"; import { getModel } from "@/utils/llms/model"; -import type { EmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; import { getEmailForLLM } from "@/utils/get-email-from-message"; import { captureException } from "@/utils/error"; @@ -22,7 +22,7 @@ const resultSchema = z.object({ relevantEmails: z .array(z.string()) .describe( - "Relevant emails and the user's responses. One question+answer per array item.", + "Past email conversations from search results that could help draft the response. Leave empty if no relevant past emails found.", ), }); export type ReplyContextCollectorResult = z.infer; @@ -32,14 +32,17 @@ const agentSystem = `You are an intelligent email assistant that gathers histori Your task is to: 1. Analyze the current email thread to understand the main topic, question, or request 2. Search through the user's email history to find similar conversations from the past 6 months -3. Collect and synthesize the most relevant findings +3. Collect and synthesize the most relevant findings from your searches 4. When you are done, CALL finalizeResults with your final results You have access to these tools: - searchEmails: Search for emails using queries to find relevant historical context - finalizeResults: Finalize and return your results -Important guidelines: +CRITICAL GUIDELINES: +- The current email thread is already provided to the drafting agent - DO NOT include it in relevantEmails +- The relevantEmails array should ONLY contain past emails found through your searches that could help draft a response +- If no relevant past emails are found through searching, leave the relevantEmails array empty - Perform as many searches as needed to confidently gather context, but be efficient - Focus on emails that show how similar questions were answered before - Only include information that directly helps a downstream drafting agent @@ -126,6 +129,9 @@ ${getTodayForLLM()}`; return getEmailForLLM(message, { maxLength: 2000 }); }); + logger.info("Found emails", { emails: emails.length }); + // logger.trace("Found emails", { emails }); + return emails; } catch (error) { const errorMessage = @@ -148,9 +154,12 @@ ${getTodayForLLM()}`; inputSchema: resultSchema, execute: async (finalResult) => { logger.info("Finalizing results", { - notes: finalResult.notes, relevantEmails: finalResult.relevantEmails.length, }); + logger.trace("Finalizing results", { + notes: finalResult.notes, + relevantEmails: finalResult.relevantEmails, + }); result = finalResult; diff --git a/apps/web/utils/assess.ts b/apps/web/utils/assess.ts index 882903a528..2d4ff084ca 100644 --- a/apps/web/utils/assess.ts +++ b/apps/web/utils/assess.ts @@ -1,7 +1,7 @@ import uniq from "lodash/uniq"; import countBy from "lodash/countBy"; -import type { EmailProvider } from "@/utils/email/provider"; -import { GmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; +import { GmailProvider } from "@/utils/email/google"; import { getEmailClient } from "@/utils/mail"; import { isDefined } from "@/utils/types"; import { createScopedLogger } from "@/utils/logger"; diff --git a/apps/web/utils/assistant/process-assistant-email.ts b/apps/web/utils/assistant/process-assistant-email.ts index 0d29ae6df5..ee2a7c22c1 100644 --- a/apps/web/utils/assistant/process-assistant-email.ts +++ b/apps/web/utils/assistant/process-assistant-email.ts @@ -6,7 +6,7 @@ import prisma from "@/utils/prisma"; import { emailToContent } from "@/utils/mail"; import { isAssistantEmail } from "@/utils/assistant/is-assistant-email"; import { internalDateToDate } from "@/utils/date"; -import type { EmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; const logger = createScopedLogger("process-assistant-email"); diff --git a/apps/web/utils/categorize/senders/categorize.ts b/apps/web/utils/categorize/senders/categorize.ts index 4dd2d74c0f..9b136d9517 100644 --- a/apps/web/utils/categorize/senders/categorize.ts +++ b/apps/web/utils/categorize/senders/categorize.ts @@ -12,7 +12,7 @@ import type { EmailAccountWithAI } from "@/utils/llms/types"; import { createScopedLogger } from "@/utils/logger"; import { extractEmailAddress } from "@/utils/email"; import { SafeError } from "@/utils/error"; -import type { EmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; const logger = createScopedLogger("categorize/senders"); diff --git a/apps/web/utils/cold-email/is-cold-email.test.ts b/apps/web/utils/cold-email/is-cold-email.test.ts index b7b3dc67f4..6a843917f2 100644 --- a/apps/web/utils/cold-email/is-cold-email.test.ts +++ b/apps/web/utils/cold-email/is-cold-email.test.ts @@ -3,7 +3,7 @@ import prisma from "@/utils/prisma"; import { ColdEmailSetting, ColdEmailStatus } from "@prisma/client"; import { blockColdEmailWithProvider } from "./is-cold-email"; import { getEmailAccount } from "@/__tests__/helpers"; -import type { EmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; // Mock dependencies vi.mock("server-only", () => ({})); diff --git a/apps/web/utils/cold-email/is-cold-email.ts b/apps/web/utils/cold-email/is-cold-email.ts index aeb14b2032..d8015a7191 100644 --- a/apps/web/utils/cold-email/is-cold-email.ts +++ b/apps/web/utils/cold-email/is-cold-email.ts @@ -11,7 +11,7 @@ import { DEFAULT_COLD_EMAIL_PROMPT } from "@/utils/cold-email/prompt"; import { stringifyEmail } from "@/utils/stringify-email"; import { createScopedLogger } from "@/utils/logger"; import type { EmailForLLM } from "@/utils/types"; -import type { EmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; import { getModel, type ModelType } from "@/utils/llms/model"; import { createGenerateObject } from "@/utils/llms"; diff --git a/apps/web/utils/email/google.ts b/apps/web/utils/email/google.ts new file mode 100644 index 0000000000..18f1c8335e --- /dev/null +++ b/apps/web/utils/email/google.ts @@ -0,0 +1,665 @@ +import type { gmail_v1 } from "@googleapis/gmail"; +import type { ParsedMessage } from "@/utils/types"; +import { parseMessage } from "@/utils/gmail/message"; +import { + getMessage as getGmailMessage, + getMessages as getGmailMessages, + getSentMessages as getGmailSentMessages, + hasPreviousCommunicationsWithSenderOrDomain, +} from "@/utils/gmail/message"; +import { + getLabels as getGmailLabels, + getLabelById as getGmailLabelById, + createLabel as createGmailLabel, + getOrCreateInboxZeroLabel as getOrCreateGmailInboxZeroLabel, + GmailLabel, +} from "@/utils/gmail/label"; +import { labelVisibility, messageVisibility } from "@/utils/gmail/constants"; +import type { InboxZeroLabel } from "@/utils/label"; +import type { ThreadsQuery } from "@/app/api/threads/validation"; +import { getMessageByRfc822Id } from "@/utils/gmail/message"; +import { + draftEmail as gmailDraftEmail, + forwardEmail as gmailForwardEmail, + replyToEmail as gmailReplyToEmail, + sendEmailWithPlainText as gmailSendEmailWithPlainText, +} from "@/utils/gmail/mail"; +import { + archiveThread as gmailArchiveThread, + getOrCreateLabel as gmailGetOrCreateLabel, + labelMessage as gmailLabelMessage, + markReadThread as gmailMarkReadThread, + removeThreadLabel as gmailRemoveThreadLabel, +} from "@/utils/gmail/label"; +import { trashThread as gmailTrashThread } from "@/utils/gmail/trash"; +import { markSpam as gmailMarkSpam } from "@/utils/gmail/spam"; +import { handlePreviousDraftDeletion } from "@/utils/ai/choose-rule/draft-management"; +import { + getThreadMessages as getGmailThreadMessages, + getThreadsFromSenderWithSubject as getGmailThreadsFromSenderWithSubject, +} from "@/utils/gmail/thread"; +import { getMessagesBatch } from "@/utils/gmail/message"; +import { getAccessTokenFromClient } from "@/utils/gmail/client"; +import { getGmailAttachment } from "@/utils/gmail/attachment"; +import { + getThreadsBatch, + getThreadsWithNextPageToken, +} from "@/utils/gmail/thread"; +import { decodeSnippet } from "@/utils/gmail/decode"; +import { + getAwaitingReplyLabel as getGmailAwaitingReplyLabel, + getReplyTrackingLabels, +} from "@/utils/reply-tracker/label"; +import { + getDraft as getGmailDraft, + deleteDraft as deleteGmailDraft, +} from "@/utils/gmail/draft"; +import { + getFiltersList as getGmailFiltersList, + createFilter as createGmailFilter, + deleteFilter as deleteGmailFilter, + createAutoArchiveFilter, +} from "@/utils/gmail/filter"; +import { processHistoryForUser as processGmailHistory } from "@/app/api/google/webhook/process-history"; +import { watchGmail, unwatchGmail } from "@/utils/gmail/watch"; +import type { + EmailProvider, + EmailThread, + EmailLabel, + EmailFilter, +} from "@/utils/email/types"; +import { createScopedLogger } from "@/utils/logger"; + +const logger = createScopedLogger("gmail-provider"); + +export class GmailProvider implements EmailProvider { + readonly name = "google"; + private client: gmail_v1.Gmail; + constructor(client: gmail_v1.Gmail) { + this.client = client; + } + + async getThreads(labelId?: string): Promise { + const response = await this.client.users.threads.list({ + userId: "me", + q: labelId ? `in:${labelId}` : undefined, + }); + + const threads = response.data.threads || []; + const threadPromises = threads.map((thread) => this.getThread(thread.id!)); + return Promise.all(threadPromises); + } + + async getThread(threadId: string): Promise { + const response = await this.client.users.threads.get({ + userId: "me", + id: threadId, + }); + + const messages = response.data.messages || []; + const messagePromises = messages.map((message) => + this.getMessage(message.id!), + ); + + return { + id: threadId, + messages: await Promise.all(messagePromises), + snippet: response.data.snippet || "", + historyId: response.data.historyId || undefined, + }; + } + + async getLabels(): Promise { + const labels = await getGmailLabels(this.client); + return (labels || []) + .filter( + (label) => + label.type === "user" && + label.labelListVisibility !== labelVisibility.labelHide, + ) + .map((label) => ({ + id: label.id!, + name: label.name!, + type: label.type!, + threadsTotal: label.threadsTotal || undefined, + labelListVisibility: label.labelListVisibility || undefined, + messageListVisibility: label.messageListVisibility || undefined, + })); + } + + async getLabelById(labelId: string): Promise { + try { + const label = await getGmailLabelById({ + gmail: this.client, + id: labelId, + }); + return { + id: label.id!, + name: label.name!, + type: label.type!, + threadsTotal: label.threadsTotal || undefined, + }; + } catch { + return null; + } + } + + async getMessage(messageId: string): Promise { + const message = await getGmailMessage(messageId, this.client, "full"); + return parseMessage(message); + } + + async getMessages(query?: string, maxResults = 50): Promise { + const response = await getGmailMessages(this.client, { + query, + maxResults, + }); + const messages = response.messages || []; + return messages + .filter((message) => message.payload) + .map((message) => parseMessage(message as any)); + } + + async getSentMessages(maxResults = 20): Promise { + return getGmailSentMessages(this.client, maxResults); + } + + async archiveThread(threadId: string, ownerEmail: string): Promise { + await gmailArchiveThread({ + gmail: this.client, + threadId, + ownerEmail, + actionSource: "automation", + }); + } + + async archiveThreadWithLabel( + threadId: string, + ownerEmail: string, + labelId?: string, + ): Promise { + await gmailArchiveThread({ + gmail: this.client, + threadId, + ownerEmail, + actionSource: "user", + labelId, + }); + } + + async trashThread( + threadId: string, + ownerEmail: string, + actionSource: "user" | "automation", + ): Promise { + await gmailTrashThread({ + gmail: this.client, + threadId, + ownerEmail, + actionSource, + }); + } + + async labelMessage(messageId: string, labelName: string): Promise { + const label = await gmailGetOrCreateLabel({ + gmail: this.client, + name: labelName, + }); + if (!label.id) + throw new Error("Label not found and unable to create label"); + await gmailLabelMessage({ + gmail: this.client, + messageId, + addLabelIds: [label.id], + }); + } + + async getDraft(draftId: string): Promise { + return getGmailDraft(draftId, this.client); + } + + async deleteDraft(draftId: string): Promise { + await deleteGmailDraft(this.client, draftId); + } + + async draftEmail( + email: ParsedMessage, + args: { to?: string; subject?: string; content: string }, + executedRule?: { id: string; threadId: string; emailAccountId: string }, + ): Promise<{ draftId: string }> { + if (executedRule) { + // Run draft creation and previous draft deletion in parallel + const [result] = await Promise.all([ + gmailDraftEmail(this.client, email, args), + handlePreviousDraftDeletion({ + client: this, + executedRule, + logger, + }), + ]); + return { draftId: result.data.id || "" }; + } else { + const result = await gmailDraftEmail(this.client, email, args); + return { draftId: result.data.id || "" }; + } + } + + async replyToEmail(email: ParsedMessage, content: string): Promise { + await gmailReplyToEmail(this.client, email, content); + } + + async sendEmail(args: { + to: string; + cc?: string; + bcc?: string; + subject: string; + messageText: string; + }): Promise { + await gmailSendEmailWithPlainText(this.client, args); + } + + async forwardEmail( + email: ParsedMessage, + args: { to: string; cc?: string; bcc?: string; content?: string }, + ): Promise { + await gmailForwardEmail(this.client, { messageId: email.id, ...args }); + } + + async markSpam(threadId: string): Promise { + await gmailMarkSpam({ gmail: this.client, threadId }); + } + + async markRead(threadId: string): Promise { + await gmailMarkReadThread({ + gmail: this.client, + threadId, + read: true, + }); + } + + async getThreadMessages(threadId: string): Promise { + return getGmailThreadMessages(threadId, this.client); + } + + async getPreviousConversationMessages( + messageIds: string[], + ): Promise { + return getMessagesBatch({ + messageIds, + accessToken: getAccessTokenFromClient(this.client), + }); + } + + async removeThreadLabel(threadId: string, labelId: string): Promise { + await gmailRemoveThreadLabel(this.client, threadId, labelId); + } + + async getAwaitingReplyLabel(): Promise { + return getGmailAwaitingReplyLabel(this.client); + } + + async createLabel(name: string): Promise { + const label = await createGmailLabel({ + gmail: this.client, + name, + messageListVisibility: messageVisibility.show, + labelListVisibility: labelVisibility.labelShow, + }); + + return { + id: label.id!, + name: label.name!, + type: label.type!, + }; + } + + async getOrCreateInboxZeroLabel(key: InboxZeroLabel): Promise { + const label = await getOrCreateGmailInboxZeroLabel({ + gmail: this.client, + key, + }); + return { + id: label.id!, + name: label.name!, + type: label.type!, + threadsTotal: label.threadsTotal || undefined, + }; + } + + async getOriginalMessage( + originalMessageId: string | undefined, + ): Promise { + if (!originalMessageId) return null; + const originalMessage = await getMessageByRfc822Id( + originalMessageId, + this.client, + ); + if (!originalMessage) return null; + return parseMessage(originalMessage); + } + + async getFiltersList(): Promise { + const response = await getGmailFiltersList({ gmail: this.client }); + return (response.data.filter || []).map((filter) => ({ + id: filter.id || "", + criteria: { + from: filter.criteria?.from || undefined, + }, + action: { + addLabelIds: filter.action?.addLabelIds || undefined, + removeLabelIds: filter.action?.removeLabelIds || undefined, + }, + })); + } + + async createFilter(options: { + from: string; + addLabelIds?: string[]; + removeLabelIds?: string[]; + }): Promise { + return createGmailFilter({ gmail: this.client, ...options }); + } + + async createAutoArchiveFilter(options: { + from: string; + gmailLabelId?: string; + }): Promise { + return createAutoArchiveFilter({ + gmail: this.client, + from: options.from, + gmailLabelId: options.gmailLabelId, + }); + } + + async deleteFilter(id: string): Promise { + return deleteGmailFilter({ gmail: this.client, id }); + } + + async getMessagesWithPagination(options: { + query?: string; + maxResults?: number; + pageToken?: string; + before?: Date; + after?: Date; + }): Promise<{ + messages: ParsedMessage[]; + nextPageToken?: string; + }> { + // Build query string for date filtering + let query = options.query || ""; + + if (options.before) { + query += ` before:${Math.floor(options.before.getTime() / 1000) + 1}`; + } + + if (options.after) { + query += ` after:${Math.floor(options.after.getTime() / 1000) - 1}`; + } + + query += ` -label:${GmailLabel.DRAFT}`; + + const response = await getGmailMessages(this.client, { + query: query.trim() || undefined, + maxResults: options.maxResults || 20, + pageToken: options.pageToken || undefined, + }); + + const messages = response.messages || []; + const messagePromises = messages.map((message) => + this.getMessage(message.id!), + ); + + return { + messages: await Promise.all(messagePromises), + nextPageToken: response.nextPageToken || undefined, + }; + } + + async getMessagesBatch(messageIds: string[]): Promise { + return getMessagesBatch({ + messageIds, + accessToken: getAccessTokenFromClient(this.client), + }); + } + + getAccessToken(): string { + return getAccessTokenFromClient(this.client); + } + + async markReadThread(threadId: string, read: boolean): Promise { + await gmailMarkReadThread({ + gmail: this.client, + threadId, + read, + }); + } + + async checkIfReplySent(senderEmail: string): Promise { + try { + const query = `from:me to:${senderEmail} label:sent`; + const response = await getGmailMessages(this.client, { + query, + maxResults: 1, + }); + const sent = (response.messages?.length ?? 0) > 0; + logger.info("Checked for sent reply", { senderEmail, sent }); + return sent; + } catch (error) { + logger.error("Error checking if reply was sent", { + error, + senderEmail, + }); + return true; // Default to true on error (safer for TO_REPLY filtering) + } + } + + async countReceivedMessages( + senderEmail: string, + threshold: number, + ): Promise { + try { + const query = `from:${senderEmail}`; + logger.info(`Checking received message count (up to ${threshold})`, { + senderEmail, + threshold, + }); + + // Fetch up to the threshold number of message IDs. + const response = await getGmailMessages(this.client, { + query, + maxResults: threshold, + }); + const count = response.messages?.length ?? 0; + + logger.info("Received message count check result", { + senderEmail, + count, + }); + return count; + } catch (error) { + logger.error("Error counting received messages", { + error, + senderEmail, + }); + return 0; // Default to 0 on error + } + } + + async getAttachment( + messageId: string, + attachmentId: string, + ): Promise<{ data: string; size: number }> { + const attachment = await getGmailAttachment( + this.client, + messageId, + attachmentId, + ); + return { + data: attachment.data || "", + size: attachment.size || 0, + }; + } + + async getThreadsWithQuery(options: { + query?: ThreadsQuery; + maxResults?: number; + pageToken?: string; + }): Promise<{ + threads: EmailThread[]; + nextPageToken?: string; + }> { + const query = options.query; + + function getQuery() { + if (query?.q) { + return query.q; + } + if (query?.fromEmail) { + return `from:${query.fromEmail}`; + } + if (query?.type === "archive") { + return `-label:${GmailLabel.INBOX}`; + } + return undefined; + } + + function getLabelIds(type?: string | null) { + switch (type) { + case "inbox": + return [GmailLabel.INBOX]; + case "sent": + return [GmailLabel.SENT]; + case "draft": + return [GmailLabel.DRAFT]; + case "trash": + return [GmailLabel.TRASH]; + case "spam": + return [GmailLabel.SPAM]; + case "starred": + return [GmailLabel.STARRED]; + case "important": + return [GmailLabel.IMPORTANT]; + case "unread": + return [GmailLabel.UNREAD]; + case "archive": + return undefined; + case "all": + return undefined; + default: + if (!type || type === "undefined" || type === "null") + return [GmailLabel.INBOX]; + return [type]; + } + } + + const { threads: gmailThreads, nextPageToken } = + await getThreadsWithNextPageToken({ + gmail: this.client, + q: getQuery(), + labelIds: query?.labelId + ? [query.labelId] + : getLabelIds(query?.type) || [], + maxResults: options.maxResults || 50, + pageToken: options.pageToken || undefined, + }); + + const threadIds = + gmailThreads?.map((t) => t.id).filter((id): id is string => !!id) || []; + const threads = await getThreadsBatch( + threadIds, + getAccessTokenFromClient(this.client), + ); + + const emailThreads: EmailThread[] = threads + .map((thread) => { + const id = thread.id; + if (!id) return null; + + const emailThread: EmailThread = { + id, + messages: + thread.messages?.map((message) => parseMessage(message as any)) || + [], + snippet: decodeSnippet(thread.snippet), + historyId: thread.historyId || undefined, + }; + return emailThread; + }) + .filter((thread): thread is EmailThread => thread !== null); + + return { + threads: emailThreads, + nextPageToken: nextPageToken || undefined, + }; + } + + async hasPreviousCommunicationsWithSenderOrDomain(options: { + from: string; + date: Date; + messageId: string; + }): Promise { + return hasPreviousCommunicationsWithSenderOrDomain(this, options); + } + + async getThreadsFromSenderWithSubject( + sender: string, + limit: number, + ): Promise> { + return getGmailThreadsFromSenderWithSubject( + this.client, + this.getAccessToken(), + sender, + limit, + ); + } + + async getReplyTrackingLabels(): Promise<{ + awaitingReplyLabelId: string; + needsReplyLabelId: string; + }> { + return getReplyTrackingLabels(this.client); + } + + async processHistory(options: { + emailAddress: string; + historyId?: number; + startHistoryId?: number; + subscriptionId?: string; + resourceData?: { + id: string; + conversationId?: string; + }; + }): Promise { + await processGmailHistory( + { + emailAddress: options.emailAddress, + historyId: options.historyId || 0, + }, + { + startHistoryId: options.startHistoryId?.toString(), + }, + ); + } + + async watchEmails(): Promise<{ + expirationDate: Date; + subscriptionId?: string; + } | null> { + const res = await watchGmail(this.client); + + if (res.expiration) { + const expirationDate = new Date(+res.expiration); + return { expirationDate }; + } + return null; + } + + async unwatchEmails(): Promise { + await unwatchGmail(this.client); + } + + // Gmail: The first message id in a thread is the threadId + isReplyInThread(message: ParsedMessage): boolean { + return !!(message.id && message.id !== message.threadId); + } +} diff --git a/apps/web/utils/email/microsoft.ts b/apps/web/utils/email/microsoft.ts new file mode 100644 index 0000000000..34b6eaedba --- /dev/null +++ b/apps/web/utils/email/microsoft.ts @@ -0,0 +1,822 @@ +import type { OutlookClient } from "@/utils/outlook/client"; +import type { ParsedMessage } from "@/utils/types"; +import { + getMessage as getOutlookMessage, + getMessages as getOutlookMessages, + queryBatchMessages as getOutlookBatchMessages, + getFolderIds, +} from "@/utils/outlook/message"; +import { + getLabels as getOutlookLabels, + createLabel as createOutlookLabel, + getOrCreateInboxZeroLabel as getOrCreateOutlookInboxZeroLabel, +} from "@/utils/outlook/label"; +import type { InboxZeroLabel } from "@/utils/label"; +import type { ThreadsQuery } from "@/app/api/threads/validation"; +import { + draftEmail as outlookDraftEmail, + forwardEmail as outlookForwardEmail, + replyToEmail as outlookReplyToEmail, + sendEmailWithPlainText as outlookSendEmailWithPlainText, +} from "@/utils/outlook/mail"; +import { + archiveThread as outlookArchiveThread, + getOrCreateLabel as outlookGetOrCreateLabel, + labelMessage as outlookLabelMessage, + markReadThread as outlookMarkReadThread, +} from "@/utils/outlook/label"; +import { trashThread as outlookTrashThread } from "@/utils/outlook/trash"; +import { markSpam as outlookMarkSpam } from "@/utils/outlook/spam"; +import { handlePreviousDraftDeletion } from "@/utils/ai/choose-rule/draft-management"; +import { createScopedLogger } from "@/utils/logger"; +import { + getThreadMessages as getOutlookThreadMessages, + getThreadsFromSenderWithSubject as getOutlookThreadsFromSenderWithSubject, +} from "@/utils/outlook/thread"; +import { getOutlookAttachment } from "@/utils/outlook/attachment"; +import { getOrCreateLabels as getOutlookOrCreateLabels } from "@/utils/outlook/label"; +import { + AWAITING_REPLY_LABEL_NAME, + NEEDS_REPLY_LABEL_NAME, +} from "@/utils/reply-tracker/consts"; +import { + getDraft as getOutlookDraft, + deleteDraft as deleteOutlookDraft, +} from "@/utils/outlook/draft"; +import { + getFiltersList as getOutlookFiltersList, + createFilter as createOutlookFilter, + deleteFilter as deleteOutlookFilter, + createAutoArchiveFilter as createOutlookAutoArchiveFilter, +} from "@/utils/outlook/filter"; +import { processHistoryForUser as processOutlookHistory } from "@/app/api/outlook/webhook/process-history"; +import { watchOutlook, unwatchOutlook } from "@/utils/outlook/watch"; +import type { + EmailProvider, + EmailThread, + EmailLabel, + EmailFilter, +} from "@/utils/email/types"; + +const logger = createScopedLogger("outlook-provider"); + +export class OutlookProvider implements EmailProvider { + readonly name = "microsoft-entra-id"; + private client: OutlookClient; + + constructor(client: OutlookClient) { + this.client = client; + } + + async getThreads(folderId?: string): Promise { + const messages = await this.getMessages(folderId); + const threadMap = new Map(); + + messages.forEach((message) => { + const threadId = message.threadId; + if (!threadMap.has(threadId)) { + threadMap.set(threadId, []); + } + threadMap.get(threadId)!.push(message); + }); + + return Array.from(threadMap.entries()).map(([id, messages]) => ({ + id, + messages, + snippet: messages[0]?.snippet || "", + })); + } + + async getThread(threadId: string): Promise { + const messages = await this.getMessages(`conversationId:${threadId}`); + return { + id: threadId, + messages, + snippet: messages[0]?.snippet || "", + }; + } + + async getLabels(): Promise { + const labels = await getOutlookLabels(this.client); + return labels.map((label) => ({ + id: label.id || "", + name: label.displayName || "", + type: "user", + })); + } + + async getLabelById(labelId: string): Promise { + const labels = await this.getLabels(); + return labels.find((label) => label.id === labelId) || null; + } + + async getMessage(messageId: string): Promise { + return getOutlookMessage(messageId, this.client); + } + + async getMessages(query?: string, maxResults = 50): Promise { + const allMessages: ParsedMessage[] = []; + let pageToken: string | undefined; + const pageSize = 20; // Outlook API limit + + while (allMessages.length < maxResults) { + const response = await getOutlookBatchMessages(this.client, { + query, + maxResults: Math.min(pageSize, maxResults - allMessages.length), + pageToken, + }); + + const messages = response.messages || []; + allMessages.push(...messages); + + // If we got fewer messages than requested, we've reached the end + if (messages.length < pageSize || !response.nextPageToken) { + break; + } + + pageToken = response.nextPageToken; + } + + return allMessages; + } + + async getSentMessages(maxResults = 20): Promise { + const folderIds = await getFolderIds(this.client); + const sentItemsFolderId = folderIds.sentitems; + + if (!sentItemsFolderId) { + logger.warn("Could not find sent items folder"); + return []; + } + + const response = await getOutlookBatchMessages(this.client, { + maxResults, + folderId: sentItemsFolderId, + }); + + return response.messages || []; + } + + async archiveThread(threadId: string, ownerEmail: string): Promise { + await outlookArchiveThread({ + client: this.client, + threadId, + ownerEmail, + actionSource: "automation", + }); + } + + async archiveThreadWithLabel( + threadId: string, + ownerEmail: string, + labelId?: string, + ): Promise { + await outlookArchiveThread({ + client: this.client, + threadId, + ownerEmail, + actionSource: "user", + labelId, + }); + } + + async trashThread( + threadId: string, + ownerEmail: string, + actionSource: "user" | "automation", + ): Promise { + await outlookTrashThread({ + client: this.client, + threadId, + ownerEmail, + actionSource, + }); + } + + async labelMessage(messageId: string, labelName: string): Promise { + const label = await outlookGetOrCreateLabel({ + client: this.client, + name: labelName, + }); + await outlookLabelMessage({ + client: this.client, + messageId, + categories: [label.displayName || ""], + }); + } + + async getDraft(draftId: string): Promise { + return getOutlookDraft(draftId, this.client); + } + + async deleteDraft(draftId: string): Promise { + await deleteOutlookDraft(this.client, draftId); + } + + async draftEmail( + email: ParsedMessage, + args: { to?: string; subject?: string; content: string }, + executedRule?: { id: string; threadId: string; emailAccountId: string }, + ): Promise<{ draftId: string }> { + if (executedRule) { + // Run draft creation and previous draft deletion in parallel + const [result] = await Promise.all([ + outlookDraftEmail(this.client, email, args), + handlePreviousDraftDeletion({ + client: this, + executedRule, + logger, + }), + ]); + return { draftId: result.id }; + } else { + const result = await outlookDraftEmail(this.client, email, args); + return { draftId: result.id }; + } + } + + async replyToEmail(email: ParsedMessage, content: string): Promise { + await outlookReplyToEmail(this.client, email, content); + } + + async sendEmail(args: { + to: string; + cc?: string; + bcc?: string; + subject: string; + messageText: string; + }): Promise { + await outlookSendEmailWithPlainText(this.client, args); + } + + async forwardEmail( + email: ParsedMessage, + args: { to: string; cc?: string; bcc?: string; content?: string }, + ): Promise { + await outlookForwardEmail(this.client, { messageId: email.id, ...args }); + } + + async markSpam(threadId: string): Promise { + await outlookMarkSpam(this.client, threadId); + } + + async markRead(threadId: string): Promise { + await outlookMarkReadThread({ + client: this.client, + threadId, + read: true, + }); + } + + async getThreadMessages(threadId: string): Promise { + return getOutlookThreadMessages(threadId, this.client); + } + + async getPreviousConversationMessages( + messageIds: string[], + ): Promise { + return this.getThreadMessages(messageIds[0]); + } + + async removeThreadLabel(_threadId: string, _labelId: string): Promise { + // For Outlook, we don't need to do anything with labels at this point + return Promise.resolve(); + } + + async createLabel(name: string): Promise { + const label = await createOutlookLabel({ + client: this.client, + name, + }); + + return { + id: label.id, + name: label.displayName || label.id, + type: "user", + }; + } + + async getOrCreateInboxZeroLabel(key: InboxZeroLabel): Promise { + const label = await getOrCreateOutlookInboxZeroLabel({ + client: this.client, + key, + }); + return { + id: label.id, + name: label.displayName || label.id, + type: "user", + }; + } + + async getOriginalMessage( + originalMessageId: string | undefined, + ): Promise { + if (!originalMessageId) return null; + try { + return await this.getMessage(originalMessageId); + } catch { + return null; + } + } + + async getFiltersList(): Promise { + try { + const response = await getOutlookFiltersList({ client: this.client }); + + const mappedFilters = (response.value || []).map( + (filter: { + id: string; + conditions: { senderContains: string[] }; + actions: { applyCategories: string[]; moveToFolder: string }; + }) => { + const mappedFilter = { + id: filter.id || "", + criteria: { + from: filter.conditions?.senderContains?.[0] || undefined, + }, + action: { + addLabelIds: filter.actions?.applyCategories || undefined, + removeLabelIds: filter.actions?.moveToFolder + ? ["INBOX"] + : undefined, + }, + }; + return mappedFilter; + }, + ); + + return mappedFilters; + } catch (error) { + logger.error("Error in Outlook getFiltersList", { error }); + throw error; + } + } + + async createFilter(options: { + from: string; + addLabelIds?: string[]; + removeLabelIds?: string[]; + }): Promise { + return createOutlookFilter({ client: this.client, ...options }); + } + + async createAutoArchiveFilter(options: { + from: string; + labelName?: string; + }): Promise { + return createOutlookAutoArchiveFilter({ + client: this.client, + from: options.from, + labelName: options.labelName, + }); + } + + async deleteFilter(id: string): Promise { + return deleteOutlookFilter({ client: this.client, id }); + } + + async getMessagesWithPagination(options: { + query?: string; + maxResults?: number; + pageToken?: string; + before?: Date; + after?: Date; + }): Promise<{ + messages: ParsedMessage[]; + nextPageToken?: string; + }> { + // For Outlook, we need to handle date filtering differently + // Microsoft Graph API uses different date filtering syntax + let query = options.query || ""; + + // Build date filter for Outlook + const dateFilters: string[] = []; + if (options.before) { + dateFilters.push(`receivedDateTime lt ${options.before.toISOString()}`); + } + if (options.after) { + dateFilters.push(`receivedDateTime gt ${options.after.toISOString()}`); + } + + // Combine date filters with existing query + if (dateFilters.length > 0) { + const dateFilter = dateFilters.join(" and "); + query = query ? `${query} and ${dateFilter}` : dateFilter; + } + + // Get folder IDs to get the inbox folder ID + const folderIds = await getFolderIds(this.client); + const inboxFolderId = folderIds.inbox; + + if (!inboxFolderId) { + throw new Error("Could not find inbox folder ID"); + } + + const response = await getOutlookBatchMessages(this.client, { + query: query.trim() || undefined, + maxResults: options.maxResults || 20, + pageToken: options.pageToken, + folderId: inboxFolderId, // Pass the inbox folder ID to match original behavior + }); + + return { + messages: response.messages || [], + nextPageToken: response.nextPageToken, + }; + } + + async getMessagesBatch(messageIds: string[]): Promise { + // For Outlook, we need to fetch messages individually since there's no batch endpoint + const messagePromises = messageIds.map((messageId) => + this.getMessage(messageId), + ); + return Promise.all(messagePromises); + } + + getAccessToken(): string { + return this.client.getAccessToken(); + } + + async markReadThread(threadId: string, read: boolean): Promise { + await outlookMarkReadThread({ + client: this.client, + threadId, + read, + }); + } + async checkIfReplySent(senderEmail: string): Promise { + try { + const query = `from:me to:${senderEmail}`; + const response = await getOutlookMessages(this.client, { + query, + maxResults: 1, + }); + const sent = (response.messages?.length ?? 0) > 0; + logger.info("Checked for sent reply", { senderEmail, sent }); + return sent; + } catch (error) { + logger.error("Error checking if reply was sent", { + error, + senderEmail, + }); + return true; // Default to true on error (safer for TO_REPLY filtering) + } + } + + async countReceivedMessages( + senderEmail: string, + threshold: number, + ): Promise { + try { + const query = `from:${senderEmail}`; + logger.info(`Checking received message count (up to ${threshold})`, { + senderEmail, + threshold, + }); + + // Fetch up to the threshold number of messages + const response = await getOutlookMessages(this.client, { + query, + maxResults: threshold, + }); + const count = response.messages?.length ?? 0; + + logger.info("Received message count check result", { + senderEmail, + count, + }); + return count; + } catch (error) { + logger.error("Error counting received messages", { + error, + senderEmail, + }); + return 0; // Default to 0 on error + } + } + + async getAttachment( + messageId: string, + attachmentId: string, + ): Promise<{ data: string; size: number }> { + const attachment = await getOutlookAttachment( + this.client, + messageId, + attachmentId, + ); + + // Outlook attachments return the data directly, not base64 encoded + // We need to convert it to base64 for consistency with Gmail + const data = attachment.contentBytes + ? Buffer.from(attachment.contentBytes, "base64").toString("base64") + : ""; + + return { + data, + size: attachment.size || 0, + }; + } + + async getThreadsWithQuery(options: { + query?: ThreadsQuery; + maxResults?: number; + pageToken?: string; + }): Promise<{ + threads: EmailThread[]; + nextPageToken?: string; + }> { + const query = options.query; + const client = this.client.getClient(); + + // Build the filter query for Microsoft Graph API + function getFilter() { + const filters: string[] = []; + + // Add folder filter based on type or labelId + if (query?.labelId) { + // Use labelId as parentFolderId (should be lowercase for Outlook) + filters.push(`parentFolderId eq '${query.labelId.toLowerCase()}'`); + } else if (query?.type === "all") { + // For "all" type, include both inbox and archive + filters.push( + "(parentFolderId eq 'inbox' or parentFolderId eq 'archive')", + ); + } else { + // Default to inbox only + filters.push("parentFolderId eq 'inbox'"); + } + + // Add other filters + if (query?.fromEmail) { + // Escape single quotes in email address + const escapedEmail = query.fromEmail.replace(/'/g, "''"); + filters.push(`from/emailAddress/address eq '${escapedEmail}'`); + } + + if (query?.q) { + // Escape single quotes in search query + const escapedQuery = query.q.replace(/'/g, "''"); + filters.push( + `(contains(subject,'${escapedQuery}') or contains(bodyPreview,'${escapedQuery}'))`, + ); + } + + return filters.length > 0 ? filters.join(" and ") : undefined; + } + + // Get messages from Microsoft Graph API + const endpoint = "/me/messages"; + + // Build the request + let request = client + .api(endpoint) + .select( + "id,conversationId,conversationIndex,subject,bodyPreview,from,toRecipients,receivedDateTime,isDraft,body,categories,parentFolderId", + ) + .top(options.maxResults || 50); + + // Add filter if present + const filter = getFilter(); + if (filter) { + request = request.filter(filter); + } + + // Only add ordering if we don't have a fromEmail filter to avoid complexity + if (!query?.fromEmail) { + request = request.orderby("receivedDateTime DESC"); + } + + // Handle pagination + if (options.pageToken) { + request = request.skipToken(options.pageToken); + } + + const response = await request.get(); + + // Sort messages by receivedDateTime if we filtered by fromEmail (since we couldn't use orderby) + let sortedMessages = response.value; + if (query?.fromEmail) { + sortedMessages = response.value.sort( + (a: { receivedDateTime: string }, b: { receivedDateTime: string }) => + new Date(b.receivedDateTime).getTime() - + new Date(a.receivedDateTime).getTime(), + ); + } + + // Group messages by conversationId to create threads + const messagesByThread = new Map< + string, + { + conversationId: string; + conversationIndex?: string; + id: string; + bodyPreview: string; + body: { content: string }; + from: { emailAddress: { address: string } }; + toRecipients: { emailAddress: { address: string } }[]; + receivedDateTime: string; + subject: string; + }[] + >(); + sortedMessages.forEach( + (message: { + conversationId: string; + id: string; + bodyPreview: string; + body: { content: string }; + from: { emailAddress: { address: string } }; + toRecipients: { emailAddress: { address: string } }[]; + receivedDateTime: string; + subject: string; + }) => { + // Skip messages without conversationId + if (!message.conversationId) { + logger.warn("Message missing conversationId", { + messageId: message.id, + }); + return; + } + + const messages = messagesByThread.get(message.conversationId) || []; + messages.push(message); + messagesByThread.set(message.conversationId, messages); + }, + ); + + // Convert to EmailThread format + const threads: EmailThread[] = Array.from(messagesByThread.entries()) + .filter(([_threadId, messages]) => messages.length > 0) // Filter out empty threads + .map(([threadId, messages]) => { + // Convert messages to ParsedMessage format + const parsedMessages: ParsedMessage[] = messages.map((message) => { + const subject = message.subject || ""; + const date = message.receivedDateTime || new Date().toISOString(); + + // Add proper null checks for from and toRecipients + const fromAddress = message.from?.emailAddress?.address || ""; + const toAddress = + message.toRecipients?.[0]?.emailAddress?.address || ""; + + return { + id: message.id || "", + threadId: message.conversationId || "", + snippet: message.bodyPreview || "", + textPlain: message.body?.content || "", + textHtml: message.body?.content || "", + headers: { + from: fromAddress, + to: toAddress, + subject, + date, + }, + subject, + date, + labelIds: [], + internalDate: date, + historyId: "", + inline: [], + conversationIndex: message.conversationIndex, + }; + }); + + return { + id: threadId, + messages: parsedMessages, + snippet: messages[0]?.bodyPreview || "", + }; + }); + + return { + threads, + nextPageToken: response["@odata.nextLink"] + ? new URL(response["@odata.nextLink"]).searchParams.get("$skiptoken") || + undefined + : undefined, + }; + } + + async hasPreviousCommunicationsWithSenderOrDomain(options: { + from: string; + date: Date; + messageId: string; + }): Promise { + try { + const response = await this.client + .getClient() + .api("/me/messages") + .filter( + `from/emailAddress/address eq '${options.from}' and receivedDateTime lt ${options.date.toISOString()}`, + ) + .top(2) + .select("id") + .get(); + + // Check if there are any messages from this sender before the current date + // and exclude the current message + const hasPreviousEmail = response.value.some( + (message: { id: string }) => message.id !== options.messageId, + ); + + return hasPreviousEmail; + } catch (error) { + logger.error("Error checking previous communications", { + error, + options, + }); + return false; + } + } + + async getThreadsFromSenderWithSubject( + sender: string, + limit: number, + ): Promise> { + return getOutlookThreadsFromSenderWithSubject(this.client, sender, limit); + } + + async getAwaitingReplyLabel(): Promise { + const [awaitingReplyLabel] = await getOutlookOrCreateLabels({ + client: this.client, + names: [AWAITING_REPLY_LABEL_NAME], + }); + + return awaitingReplyLabel.id || ""; + } + + async getReplyTrackingLabels(): Promise<{ + awaitingReplyLabelId: string; + needsReplyLabelId: string; + }> { + const [awaitingReplyLabel, needsReplyLabel] = + await getOutlookOrCreateLabels({ + client: this.client, + names: [AWAITING_REPLY_LABEL_NAME, NEEDS_REPLY_LABEL_NAME], + }); + + return { + awaitingReplyLabelId: awaitingReplyLabel.id || "", + needsReplyLabelId: needsReplyLabel.id || "", + }; + } + + async processHistory(options: { + emailAddress: string; + historyId?: number; + startHistoryId?: number; + subscriptionId?: string; + resourceData?: { + id: string; + conversationId?: string; + }; + }): Promise { + if (!options.subscriptionId) { + throw new Error( + "subscriptionId is required for Outlook history processing", + ); + } + + await processOutlookHistory({ + subscriptionId: options.subscriptionId, + resourceData: options.resourceData || { + id: options.historyId?.toString() || "0", + conversationId: options.startHistoryId?.toString(), + }, + }); + } + + async watchEmails(): Promise<{ + expirationDate: Date; + subscriptionId?: string; + } | null> { + const subscription = await watchOutlook(this.client.getClient()); + + if (subscription.expirationDateTime) { + const expirationDate = new Date(subscription.expirationDateTime); + return { + expirationDate, + subscriptionId: subscription.id, + }; + } + return null; + } + + async unwatchEmails(subscriptionId?: string): Promise { + if (!subscriptionId) { + logger.warn("No subscription ID provided for Outlook unwatch"); + return; + } + await unwatchOutlook(this.client.getClient(), subscriptionId); + } + + isReplyInThread(message: ParsedMessage): boolean { + try { + return atob(message.conversationIndex || "").length > 22; + } catch (error) { + logger.warn("Invalid conversationIndex base64", { + conversationIndex: message.conversationIndex, + error, + }); + return false; + } + } +} diff --git a/apps/web/utils/email/provider.ts b/apps/web/utils/email/provider.ts index 959617dd78..c30962f68d 100644 --- a/apps/web/utils/email/provider.ts +++ b/apps/web/utils/email/provider.ts @@ -1,1630 +1,10 @@ -import type { gmail_v1 } from "@googleapis/gmail"; -import type { OutlookClient } from "@/utils/outlook/client"; -import type { ParsedMessage } from "@/utils/types"; -import { parseMessage } from "@/utils/gmail/message"; -import { - getMessage as getGmailMessage, - getMessages as getGmailMessages, - getSentMessages as getGmailSentMessages, - hasPreviousCommunicationsWithSenderOrDomain, -} from "@/utils/gmail/message"; -import { - getMessage as getOutlookMessage, - getMessages as getOutlookMessages, - queryBatchMessages as getOutlookBatchMessages, - getFolderIds, -} from "@/utils/outlook/message"; -import { - getLabels as getGmailLabels, - getLabelById as getGmailLabelById, - createLabel as createGmailLabel, - getOrCreateInboxZeroLabel as getOrCreateGmailInboxZeroLabel, - GmailLabel, -} from "@/utils/gmail/label"; -import { - getLabels as getOutlookLabels, - createLabel as createOutlookLabel, - getOrCreateInboxZeroLabel as getOrCreateOutlookInboxZeroLabel, -} from "@/utils/outlook/label"; -import { labelVisibility, messageVisibility } from "@/utils/gmail/constants"; import { getGmailClientForEmail, getOutlookClientForEmail, } from "@/utils/account"; -import type { InboxZeroLabel } from "@/utils/label"; -import type { ThreadsQuery } from "@/app/api/threads/validation"; -import { getMessageByRfc822Id } from "@/utils/gmail/message"; -import { - draftEmail as gmailDraftEmail, - forwardEmail as gmailForwardEmail, - replyToEmail as gmailReplyToEmail, - sendEmailWithPlainText as gmailSendEmailWithPlainText, -} from "@/utils/gmail/mail"; -import { - draftEmail as outlookDraftEmail, - forwardEmail as outlookForwardEmail, - replyToEmail as outlookReplyToEmail, - sendEmailWithPlainText as outlookSendEmailWithPlainText, -} from "@/utils/outlook/mail"; -import { - archiveThread as gmailArchiveThread, - getOrCreateLabel as gmailGetOrCreateLabel, - labelMessage as gmailLabelMessage, - markReadThread as gmailMarkReadThread, - removeThreadLabel as gmailRemoveThreadLabel, -} from "@/utils/gmail/label"; -import { trashThread as gmailTrashThread } from "@/utils/gmail/trash"; -import { - archiveThread as outlookArchiveThread, - getOrCreateLabel as outlookGetOrCreateLabel, - labelMessage as outlookLabelMessage, - markReadThread as outlookMarkReadThread, -} from "@/utils/outlook/label"; -import { trashThread as outlookTrashThread } from "@/utils/outlook/trash"; -import { markSpam as gmailMarkSpam } from "@/utils/gmail/spam"; -import { markSpam as outlookMarkSpam } from "@/utils/outlook/spam"; -import { handlePreviousDraftDeletion } from "@/utils/ai/choose-rule/draft-management"; -import { createScopedLogger } from "@/utils/logger"; -import { - getThreadMessages as getGmailThreadMessages, - getThreadsFromSenderWithSubject as getGmailThreadsFromSenderWithSubject, -} from "@/utils/gmail/thread"; -import { - getThreadMessages as getOutlookThreadMessages, - getThreadsFromSenderWithSubject as getOutlookThreadsFromSenderWithSubject, -} from "@/utils/outlook/thread"; -import { getMessagesBatch } from "@/utils/gmail/message"; -import { getAccessTokenFromClient } from "@/utils/gmail/client"; -import { getGmailAttachment } from "@/utils/gmail/attachment"; -import { getOutlookAttachment } from "@/utils/outlook/attachment"; -import { - getThreadsBatch, - getThreadsWithNextPageToken, -} from "@/utils/gmail/thread"; -import { decodeSnippet } from "@/utils/gmail/decode"; -import { - getAwaitingReplyLabel as getGmailAwaitingReplyLabel, - getReplyTrackingLabels, -} from "@/utils/reply-tracker/label"; -import { getOrCreateLabels as getOutlookOrCreateLabels } from "@/utils/outlook/label"; -import { - AWAITING_REPLY_LABEL_NAME, - NEEDS_REPLY_LABEL_NAME, -} from "@/utils/reply-tracker/consts"; -import { - getDraft as getGmailDraft, - deleteDraft as deleteGmailDraft, -} from "@/utils/gmail/draft"; -import { - getDraft as getOutlookDraft, - deleteDraft as deleteOutlookDraft, -} from "@/utils/outlook/draft"; -import { - getFiltersList as getGmailFiltersList, - createFilter as createGmailFilter, - deleteFilter as deleteGmailFilter, - createAutoArchiveFilter, -} from "@/utils/gmail/filter"; -import { - getFiltersList as getOutlookFiltersList, - createFilter as createOutlookFilter, - deleteFilter as deleteOutlookFilter, - createAutoArchiveFilter as createOutlookAutoArchiveFilter, -} from "@/utils/outlook/filter"; -import { processHistoryForUser as processGmailHistory } from "@/app/api/google/webhook/process-history"; -import { processHistoryForUser as processOutlookHistory } from "@/app/api/outlook/webhook/process-history"; -import { watchGmail, unwatchGmail } from "@/utils/gmail/watch"; -import { watchOutlook, unwatchOutlook } from "@/utils/outlook/watch"; - -const logger = createScopedLogger("email-provider"); - -export interface EmailThread { - id: string; - messages: ParsedMessage[]; - snippet: string; - historyId?: string; -} - -export interface EmailLabel { - id: string; - name: string; - type: string; - threadsTotal?: number; - color?: { - textColor?: string; - backgroundColor?: string; - }; - labelListVisibility?: string; - messageListVisibility?: string; -} - -export interface EmailFilter { - id: string; - criteria?: { - from?: string; - }; - action?: { - addLabelIds?: string[]; - removeLabelIds?: string[]; - }; -} - -export interface EmailProvider { - readonly name: "google" | "microsoft-entra-id"; - getThreads(folderId?: string): Promise; - getThread(threadId: string): Promise; - getLabels(): Promise; - getLabelById(labelId: string): Promise; - getMessage(messageId: string): Promise; - getMessages(query?: string, maxResults?: number): Promise; - getSentMessages(maxResults?: number): Promise; - getThreadMessages(threadId: string): Promise; - getPreviousConversationMessages( - messageIds: string[], - ): Promise; - archiveThread(threadId: string, ownerEmail: string): Promise; - archiveThreadWithLabel( - threadId: string, - ownerEmail: string, - labelId?: string, - ): Promise; - trashThread( - threadId: string, - ownerEmail: string, - actionSource: "user" | "automation", - ): Promise; - labelMessage(messageId: string, labelName: string): Promise; - removeThreadLabel(threadId: string, labelId: string): Promise; - getAwaitingReplyLabel(): Promise; - draftEmail( - email: ParsedMessage, - args: { to?: string; subject?: string; content: string }, - executedRule?: { id: string; threadId: string; emailAccountId: string }, - ): Promise<{ draftId: string }>; - replyToEmail(email: ParsedMessage, content: string): Promise; - sendEmail(args: { - to: string; - cc?: string; - bcc?: string; - subject: string; - messageText: string; - }): Promise; - forwardEmail( - email: ParsedMessage, - args: { to: string; cc?: string; bcc?: string; content?: string }, - ): Promise; - markSpam(threadId: string): Promise; - markRead(threadId: string): Promise; - markReadThread(threadId: string, read: boolean): Promise; - getDraft(draftId: string): Promise; - deleteDraft(draftId: string): Promise; - createLabel(name: string, description?: string): Promise; - getOrCreateInboxZeroLabel(key: InboxZeroLabel): Promise; - getOriginalMessage( - originalMessageId: string | undefined, - ): Promise; - getFiltersList(): Promise; - createFilter(options: { - from: string; - addLabelIds?: string[]; - removeLabelIds?: string[]; - }): Promise; - deleteFilter(id: string): Promise; - createAutoArchiveFilter(options: { - from: string; - gmailLabelId?: string; - labelName?: string; - }): Promise; - getMessagesWithPagination(options: { - query?: string; - maxResults?: number; - pageToken?: string; - before?: Date; - after?: Date; - }): Promise<{ - messages: ParsedMessage[]; - nextPageToken?: string; - }>; - getMessagesBatch(messageIds: string[]): Promise; - getAccessToken(): string; - checkIfReplySent(senderEmail: string): Promise; - countReceivedMessages( - senderEmail: string, - threshold: number, - ): Promise; - getAttachment( - messageId: string, - attachmentId: string, - ): Promise<{ data: string; size: number }>; - getThreadsWithQuery(options: { - query?: ThreadsQuery; - maxResults?: number; - pageToken?: string; - }): Promise<{ - threads: EmailThread[]; - nextPageToken?: string; - }>; - hasPreviousCommunicationsWithSenderOrDomain(options: { - from: string; - date: Date; - messageId: string; - }): Promise; - getThreadsFromSenderWithSubject( - sender: string, - limit: number, - ): Promise>; - getReplyTrackingLabels(): Promise<{ - awaitingReplyLabelId: string; - needsReplyLabelId: string; - }>; - processHistory(options: { - emailAddress: string; - historyId?: number; - startHistoryId?: number; - subscriptionId?: string; - resourceData?: { - id: string; - conversationId?: string; - }; - }): Promise; - watchEmails(): Promise<{ - expirationDate: Date; - subscriptionId?: string; - } | null>; - unwatchEmails(subscriptionId?: string): Promise; - isReplyInThread(message: ParsedMessage): boolean; -} - -export class GmailProvider implements EmailProvider { - readonly name = "google"; - private client: gmail_v1.Gmail; - constructor(client: gmail_v1.Gmail) { - this.client = client; - } - - async getThreads(folderId?: string): Promise { - const response = await this.client.users.threads.list({ - userId: "me", - q: folderId ? `in:${folderId}` : undefined, - }); - - const threads = response.data.threads || []; - const threadPromises = threads.map((thread) => this.getThread(thread.id!)); - return Promise.all(threadPromises); - } - - async getThread(threadId: string): Promise { - const response = await this.client.users.threads.get({ - userId: "me", - id: threadId, - }); - - const messages = response.data.messages || []; - const messagePromises = messages.map((message) => - this.getMessage(message.id!), - ); - - return { - id: threadId, - messages: await Promise.all(messagePromises), - snippet: response.data.snippet || "", - historyId: response.data.historyId || undefined, - }; - } - - async getLabels(): Promise { - const labels = await getGmailLabels(this.client); - return (labels || []) - .filter( - (label) => - label.type === "user" && - label.labelListVisibility !== labelVisibility.labelHide, - ) - .map((label) => ({ - id: label.id!, - name: label.name!, - type: label.type!, - threadsTotal: label.threadsTotal || undefined, - labelListVisibility: label.labelListVisibility || undefined, - messageListVisibility: label.messageListVisibility || undefined, - })); - } - - async getLabelById(labelId: string): Promise { - try { - const label = await getGmailLabelById({ - gmail: this.client, - id: labelId, - }); - return { - id: label.id!, - name: label.name!, - type: label.type!, - threadsTotal: label.threadsTotal || undefined, - }; - } catch { - return null; - } - } - - async getMessage(messageId: string): Promise { - const message = await getGmailMessage(messageId, this.client, "full"); - return parseMessage(message); - } - - async getMessages(query?: string, maxResults = 50): Promise { - const response = await getGmailMessages(this.client, { - query, - maxResults, - }); - const messages = response.messages || []; - return messages - .filter((message) => message.payload) - .map((message) => parseMessage(message as any)); - } - - async getSentMessages(maxResults = 20): Promise { - return getGmailSentMessages(this.client, maxResults); - } - - async archiveThread(threadId: string, ownerEmail: string): Promise { - await gmailArchiveThread({ - gmail: this.client, - threadId, - ownerEmail, - actionSource: "automation", - }); - } - - async archiveThreadWithLabel( - threadId: string, - ownerEmail: string, - labelId?: string, - ): Promise { - await gmailArchiveThread({ - gmail: this.client, - threadId, - ownerEmail, - actionSource: "user", - labelId, - }); - } - - async trashThread( - threadId: string, - ownerEmail: string, - actionSource: "user" | "automation", - ): Promise { - await gmailTrashThread({ - gmail: this.client, - threadId, - ownerEmail, - actionSource, - }); - } - - async labelMessage(messageId: string, labelName: string): Promise { - const label = await gmailGetOrCreateLabel({ - gmail: this.client, - name: labelName, - }); - if (!label.id) - throw new Error("Label not found and unable to create label"); - await gmailLabelMessage({ - gmail: this.client, - messageId, - addLabelIds: [label.id], - }); - } - - async getDraft(draftId: string): Promise { - return getGmailDraft(draftId, this.client); - } - - async deleteDraft(draftId: string): Promise { - await deleteGmailDraft(this.client, draftId); - } - - async draftEmail( - email: ParsedMessage, - args: { to?: string; subject?: string; content: string }, - executedRule?: { id: string; threadId: string; emailAccountId: string }, - ): Promise<{ draftId: string }> { - if (executedRule) { - // Run draft creation and previous draft deletion in parallel - const [result] = await Promise.all([ - gmailDraftEmail(this.client, email, args), - handlePreviousDraftDeletion({ - client: this, - executedRule, - logger, - }), - ]); - return { draftId: result.data.id || "" }; - } else { - const result = await gmailDraftEmail(this.client, email, args); - return { draftId: result.data.id || "" }; - } - } - - async replyToEmail(email: ParsedMessage, content: string): Promise { - await gmailReplyToEmail(this.client, email, content); - } - - async sendEmail(args: { - to: string; - cc?: string; - bcc?: string; - subject: string; - messageText: string; - }): Promise { - await gmailSendEmailWithPlainText(this.client, args); - } - - async forwardEmail( - email: ParsedMessage, - args: { to: string; cc?: string; bcc?: string; content?: string }, - ): Promise { - await gmailForwardEmail(this.client, { messageId: email.id, ...args }); - } - - async markSpam(threadId: string): Promise { - await gmailMarkSpam({ gmail: this.client, threadId }); - } - - async markRead(threadId: string): Promise { - await gmailMarkReadThread({ - gmail: this.client, - threadId, - read: true, - }); - } - - async getThreadMessages(threadId: string): Promise { - return getGmailThreadMessages(threadId, this.client); - } - - async getPreviousConversationMessages( - messageIds: string[], - ): Promise { - return getMessagesBatch({ - messageIds, - accessToken: getAccessTokenFromClient(this.client), - }); - } - - async removeThreadLabel(threadId: string, labelId: string): Promise { - await gmailRemoveThreadLabel(this.client, threadId, labelId); - } - - async getAwaitingReplyLabel(): Promise { - return getGmailAwaitingReplyLabel(this.client); - } - - async createLabel(name: string): Promise { - const label = await createGmailLabel({ - gmail: this.client, - name, - messageListVisibility: messageVisibility.show, - labelListVisibility: labelVisibility.labelShow, - }); - - return { - id: label.id!, - name: label.name!, - type: label.type!, - }; - } - - async getOrCreateInboxZeroLabel(key: InboxZeroLabel): Promise { - const label = await getOrCreateGmailInboxZeroLabel({ - gmail: this.client, - key, - }); - return { - id: label.id!, - name: label.name!, - type: label.type!, - threadsTotal: label.threadsTotal || undefined, - }; - } - - async getOriginalMessage( - originalMessageId: string | undefined, - ): Promise { - if (!originalMessageId) return null; - const originalMessage = await getMessageByRfc822Id( - originalMessageId, - this.client, - ); - if (!originalMessage) return null; - return parseMessage(originalMessage); - } - - async getFiltersList(): Promise { - const response = await getGmailFiltersList({ gmail: this.client }); - return (response.data.filter || []).map((filter) => ({ - id: filter.id || "", - criteria: { - from: filter.criteria?.from || undefined, - }, - action: { - addLabelIds: filter.action?.addLabelIds || undefined, - removeLabelIds: filter.action?.removeLabelIds || undefined, - }, - })); - } - - async createFilter(options: { - from: string; - addLabelIds?: string[]; - removeLabelIds?: string[]; - }): Promise { - return createGmailFilter({ gmail: this.client, ...options }); - } - - async createAutoArchiveFilter(options: { - from: string; - gmailLabelId?: string; - }): Promise { - return createAutoArchiveFilter({ - gmail: this.client, - from: options.from, - gmailLabelId: options.gmailLabelId, - }); - } - - async deleteFilter(id: string): Promise { - return deleteGmailFilter({ gmail: this.client, id }); - } - - async getMessagesWithPagination(options: { - query?: string; - maxResults?: number; - pageToken?: string; - before?: Date; - after?: Date; - }): Promise<{ - messages: ParsedMessage[]; - nextPageToken?: string; - }> { - // Build query string for date filtering - let query = options.query || ""; - - if (options.before) { - query += ` before:${Math.floor(options.before.getTime() / 1000) + 1}`; - } - - if (options.after) { - query += ` after:${Math.floor(options.after.getTime() / 1000) - 1}`; - } - - const response = await getGmailMessages(this.client, { - query: query.trim() || undefined, - maxResults: options.maxResults || 20, - pageToken: options.pageToken || undefined, - }); - - const messages = response.messages || []; - const messagePromises = messages.map((message) => - this.getMessage(message.id!), - ); - - return { - messages: await Promise.all(messagePromises), - nextPageToken: response.nextPageToken || undefined, - }; - } - - async getMessagesBatch(messageIds: string[]): Promise { - return getMessagesBatch({ - messageIds, - accessToken: getAccessTokenFromClient(this.client), - }); - } - - getAccessToken(): string { - return getAccessTokenFromClient(this.client); - } - - async markReadThread(threadId: string, read: boolean): Promise { - await gmailMarkReadThread({ - gmail: this.client, - threadId, - read, - }); - } - - async checkIfReplySent(senderEmail: string): Promise { - try { - const query = `from:me to:${senderEmail} label:sent`; - const response = await getGmailMessages(this.client, { - query, - maxResults: 1, - }); - const sent = (response.messages?.length ?? 0) > 0; - logger.info("Checked for sent reply", { senderEmail, sent }); - return sent; - } catch (error) { - logger.error("Error checking if reply was sent", { - error, - senderEmail, - }); - return true; // Default to true on error (safer for TO_REPLY filtering) - } - } - - async countReceivedMessages( - senderEmail: string, - threshold: number, - ): Promise { - try { - const query = `from:${senderEmail}`; - logger.info(`Checking received message count (up to ${threshold})`, { - senderEmail, - threshold, - }); - - // Fetch up to the threshold number of message IDs. - const response = await getGmailMessages(this.client, { - query, - maxResults: threshold, - }); - const count = response.messages?.length ?? 0; - - logger.info("Received message count check result", { - senderEmail, - count, - }); - return count; - } catch (error) { - logger.error("Error counting received messages", { - error, - senderEmail, - }); - return 0; // Default to 0 on error - } - } - - async getAttachment( - messageId: string, - attachmentId: string, - ): Promise<{ data: string; size: number }> { - const attachment = await getGmailAttachment( - this.client, - messageId, - attachmentId, - ); - return { - data: attachment.data || "", - size: attachment.size || 0, - }; - } - - async getThreadsWithQuery(options: { - query?: ThreadsQuery; - maxResults?: number; - pageToken?: string; - }): Promise<{ - threads: EmailThread[]; - nextPageToken?: string; - }> { - const query = options.query; - - function getQuery() { - if (query?.q) { - return query.q; - } - if (query?.fromEmail) { - return `from:${query.fromEmail}`; - } - if (query?.type === "archive") { - return `-label:${GmailLabel.INBOX}`; - } - return undefined; - } - - function getLabelIds(type?: string | null) { - switch (type) { - case "inbox": - return [GmailLabel.INBOX]; - case "sent": - return [GmailLabel.SENT]; - case "draft": - return [GmailLabel.DRAFT]; - case "trash": - return [GmailLabel.TRASH]; - case "spam": - return [GmailLabel.SPAM]; - case "starred": - return [GmailLabel.STARRED]; - case "important": - return [GmailLabel.IMPORTANT]; - case "unread": - return [GmailLabel.UNREAD]; - case "archive": - return undefined; - case "all": - return undefined; - default: - if (!type || type === "undefined" || type === "null") - return [GmailLabel.INBOX]; - return [type]; - } - } - - const { threads: gmailThreads, nextPageToken } = - await getThreadsWithNextPageToken({ - gmail: this.client, - q: getQuery(), - labelIds: query?.labelId - ? [query.labelId] - : getLabelIds(query?.type) || [], - maxResults: options.maxResults || 50, - pageToken: options.pageToken || undefined, - }); - - const threadIds = - gmailThreads?.map((t) => t.id).filter((id): id is string => !!id) || []; - const threads = await getThreadsBatch( - threadIds, - getAccessTokenFromClient(this.client), - ); - - const emailThreads: EmailThread[] = threads - .map((thread) => { - const id = thread.id; - if (!id) return null; - - const emailThread: EmailThread = { - id, - messages: - thread.messages?.map((message) => parseMessage(message as any)) || - [], - snippet: decodeSnippet(thread.snippet), - historyId: thread.historyId || undefined, - }; - return emailThread; - }) - .filter((thread): thread is EmailThread => thread !== null); - - return { - threads: emailThreads, - nextPageToken: nextPageToken || undefined, - }; - } - - async hasPreviousCommunicationsWithSenderOrDomain(options: { - from: string; - date: Date; - messageId: string; - }): Promise { - return hasPreviousCommunicationsWithSenderOrDomain(this, options); - } - - async getThreadsFromSenderWithSubject( - sender: string, - limit: number, - ): Promise> { - return getGmailThreadsFromSenderWithSubject( - this.client, - this.getAccessToken(), - sender, - limit, - ); - } - - async getReplyTrackingLabels(): Promise<{ - awaitingReplyLabelId: string; - needsReplyLabelId: string; - }> { - return getReplyTrackingLabels(this.client); - } - - async processHistory(options: { - emailAddress: string; - historyId?: number; - startHistoryId?: number; - subscriptionId?: string; - resourceData?: { - id: string; - conversationId?: string; - }; - }): Promise { - await processGmailHistory( - { - emailAddress: options.emailAddress, - historyId: options.historyId || 0, - }, - { - startHistoryId: options.startHistoryId?.toString(), - }, - ); - } - - async watchEmails(): Promise<{ - expirationDate: Date; - subscriptionId?: string; - } | null> { - const res = await watchGmail(this.client); - - if (res.expiration) { - const expirationDate = new Date(+res.expiration); - return { expirationDate }; - } - return null; - } - - async unwatchEmails(): Promise { - await unwatchGmail(this.client); - } - - // Gmail: The first message id in a thread is the threadId - isReplyInThread(message: ParsedMessage): boolean { - return !!(message.id && message.id !== message.threadId); - } -} - -export class OutlookProvider implements EmailProvider { - readonly name = "microsoft-entra-id"; - private client: OutlookClient; - - constructor(client: OutlookClient) { - this.client = client; - } - - async getThreads(folderId?: string): Promise { - const messages = await this.getMessages(folderId); - const threadMap = new Map(); - - messages.forEach((message) => { - const threadId = message.threadId; - if (!threadMap.has(threadId)) { - threadMap.set(threadId, []); - } - threadMap.get(threadId)!.push(message); - }); - - return Array.from(threadMap.entries()).map(([id, messages]) => ({ - id, - messages, - snippet: messages[0]?.snippet || "", - })); - } - - async getThread(threadId: string): Promise { - const messages = await this.getMessages(`conversationId:${threadId}`); - return { - id: threadId, - messages, - snippet: messages[0]?.snippet || "", - }; - } - - async getLabels(): Promise { - const labels = await getOutlookLabels(this.client); - return labels.map((label) => ({ - id: label.id || "", - name: label.displayName || "", - type: "user", - })); - } - - async getLabelById(labelId: string): Promise { - const labels = await this.getLabels(); - return labels.find((label) => label.id === labelId) || null; - } - - async getMessage(messageId: string): Promise { - return getOutlookMessage(messageId, this.client); - } - - async getMessages(query?: string, maxResults = 50): Promise { - const allMessages: ParsedMessage[] = []; - let pageToken: string | undefined; - const pageSize = 20; // Outlook API limit - - while (allMessages.length < maxResults) { - const response = await getOutlookBatchMessages(this.client, { - query, - maxResults: Math.min(pageSize, maxResults - allMessages.length), - pageToken, - }); - - const messages = response.messages || []; - allMessages.push(...messages); - - // If we got fewer messages than requested, we've reached the end - if (messages.length < pageSize || !response.nextPageToken) { - break; - } - - pageToken = response.nextPageToken; - } - - return allMessages; - } - - async getSentMessages(maxResults = 20): Promise { - const folderIds = await getFolderIds(this.client); - const sentItemsFolderId = folderIds.sentitems; - - if (!sentItemsFolderId) { - logger.warn("Could not find sent items folder"); - return []; - } - - const response = await getOutlookBatchMessages(this.client, { - maxResults, - folderId: sentItemsFolderId, - }); - - return response.messages || []; - } - - async archiveThread(threadId: string, ownerEmail: string): Promise { - await outlookArchiveThread({ - client: this.client, - threadId, - ownerEmail, - actionSource: "automation", - }); - } - - async archiveThreadWithLabel( - threadId: string, - ownerEmail: string, - labelId?: string, - ): Promise { - await outlookArchiveThread({ - client: this.client, - threadId, - ownerEmail, - actionSource: "user", - labelId, - }); - } - - async trashThread( - threadId: string, - ownerEmail: string, - actionSource: "user" | "automation", - ): Promise { - await outlookTrashThread({ - client: this.client, - threadId, - ownerEmail, - actionSource, - }); - } - - async labelMessage(messageId: string, labelName: string): Promise { - const label = await outlookGetOrCreateLabel({ - client: this.client, - name: labelName, - }); - await outlookLabelMessage({ - client: this.client, - messageId, - categories: [label.displayName || ""], - }); - } - - async getDraft(draftId: string): Promise { - return getOutlookDraft(draftId, this.client); - } - - async deleteDraft(draftId: string): Promise { - await deleteOutlookDraft(this.client, draftId); - } - - async draftEmail( - email: ParsedMessage, - args: { to?: string; subject?: string; content: string }, - executedRule?: { id: string; threadId: string; emailAccountId: string }, - ): Promise<{ draftId: string }> { - if (executedRule) { - // Run draft creation and previous draft deletion in parallel - const [result] = await Promise.all([ - outlookDraftEmail(this.client, email, args), - handlePreviousDraftDeletion({ - client: this, - executedRule, - logger, - }), - ]); - return { draftId: result.id }; - } else { - const result = await outlookDraftEmail(this.client, email, args); - return { draftId: result.id }; - } - } - - async replyToEmail(email: ParsedMessage, content: string): Promise { - await outlookReplyToEmail(this.client, email, content); - } - - async sendEmail(args: { - to: string; - cc?: string; - bcc?: string; - subject: string; - messageText: string; - }): Promise { - await outlookSendEmailWithPlainText(this.client, args); - } - - async forwardEmail( - email: ParsedMessage, - args: { to: string; cc?: string; bcc?: string; content?: string }, - ): Promise { - await outlookForwardEmail(this.client, { messageId: email.id, ...args }); - } - - async markSpam(threadId: string): Promise { - await outlookMarkSpam(this.client, threadId); - } - - async markRead(threadId: string): Promise { - await outlookMarkReadThread({ - client: this.client, - threadId, - read: true, - }); - } - - async getThreadMessages(threadId: string): Promise { - return getOutlookThreadMessages(threadId, this.client); - } - - async getPreviousConversationMessages( - messageIds: string[], - ): Promise { - return this.getThreadMessages(messageIds[0]); - } - - async removeThreadLabel(_threadId: string, _labelId: string): Promise { - // For Outlook, we don't need to do anything with labels at this point - return Promise.resolve(); - } - - async createLabel(name: string): Promise { - const label = await createOutlookLabel({ - client: this.client, - name, - }); - - return { - id: label.id, - name: label.displayName || label.id, - type: "user", - }; - } - - async getOrCreateInboxZeroLabel(key: InboxZeroLabel): Promise { - const label = await getOrCreateOutlookInboxZeroLabel({ - client: this.client, - key, - }); - return { - id: label.id, - name: label.displayName || label.id, - type: "user", - }; - } - - async getOriginalMessage( - originalMessageId: string | undefined, - ): Promise { - if (!originalMessageId) return null; - try { - return await this.getMessage(originalMessageId); - } catch { - return null; - } - } - - async getFiltersList(): Promise { - try { - const response = await getOutlookFiltersList({ client: this.client }); - - const mappedFilters = (response.value || []).map( - (filter: { - id: string; - conditions: { senderContains: string[] }; - actions: { applyCategories: string[]; moveToFolder: string }; - }) => { - const mappedFilter = { - id: filter.id || "", - criteria: { - from: filter.conditions?.senderContains?.[0] || undefined, - }, - action: { - addLabelIds: filter.actions?.applyCategories || undefined, - removeLabelIds: filter.actions?.moveToFolder - ? ["INBOX"] - : undefined, - }, - }; - return mappedFilter; - }, - ); - - return mappedFilters; - } catch (error) { - logger.error("Error in Outlook getFiltersList", { error }); - throw error; - } - } - - async createFilter(options: { - from: string; - addLabelIds?: string[]; - removeLabelIds?: string[]; - }): Promise { - return createOutlookFilter({ client: this.client, ...options }); - } - - async createAutoArchiveFilter(options: { - from: string; - labelName?: string; - }): Promise { - return createOutlookAutoArchiveFilter({ - client: this.client, - from: options.from, - labelName: options.labelName, - }); - } - - async deleteFilter(id: string): Promise { - return deleteOutlookFilter({ client: this.client, id }); - } - - async getMessagesWithPagination(options: { - query?: string; - maxResults?: number; - pageToken?: string; - before?: Date; - after?: Date; - }): Promise<{ - messages: ParsedMessage[]; - nextPageToken?: string; - }> { - // For Outlook, we need to handle date filtering differently - // Microsoft Graph API uses different date filtering syntax - let query = options.query || ""; - - // Build date filter for Outlook - const dateFilters: string[] = []; - if (options.before) { - dateFilters.push(`receivedDateTime lt ${options.before.toISOString()}`); - } - if (options.after) { - dateFilters.push(`receivedDateTime gt ${options.after.toISOString()}`); - } - - // Combine date filters with existing query - if (dateFilters.length > 0) { - const dateFilter = dateFilters.join(" and "); - query = query ? `${query} and ${dateFilter}` : dateFilter; - } - - // Get folder IDs to get the inbox folder ID - const folderIds = await getFolderIds(this.client); - const inboxFolderId = folderIds.inbox; - - if (!inboxFolderId) { - throw new Error("Could not find inbox folder ID"); - } - - const response = await getOutlookBatchMessages(this.client, { - query: query.trim() || undefined, - maxResults: options.maxResults || 20, - pageToken: options.pageToken, - folderId: inboxFolderId, // Pass the inbox folder ID to match original behavior - }); - - return { - messages: response.messages || [], - nextPageToken: response.nextPageToken, - }; - } - - async getMessagesBatch(messageIds: string[]): Promise { - // For Outlook, we need to fetch messages individually since there's no batch endpoint - const messagePromises = messageIds.map((messageId) => - this.getMessage(messageId), - ); - return Promise.all(messagePromises); - } - - getAccessToken(): string { - return this.client.getAccessToken(); - } - - async markReadThread(threadId: string, read: boolean): Promise { - await outlookMarkReadThread({ - client: this.client, - threadId, - read, - }); - } - async checkIfReplySent(senderEmail: string): Promise { - try { - const query = `from:me to:${senderEmail}`; - const response = await getOutlookMessages(this.client, { - query, - maxResults: 1, - }); - const sent = (response.messages?.length ?? 0) > 0; - logger.info("Checked for sent reply", { senderEmail, sent }); - return sent; - } catch (error) { - logger.error("Error checking if reply was sent", { - error, - senderEmail, - }); - return true; // Default to true on error (safer for TO_REPLY filtering) - } - } - - async countReceivedMessages( - senderEmail: string, - threshold: number, - ): Promise { - try { - const query = `from:${senderEmail}`; - logger.info(`Checking received message count (up to ${threshold})`, { - senderEmail, - threshold, - }); - - // Fetch up to the threshold number of messages - const response = await getOutlookMessages(this.client, { - query, - maxResults: threshold, - }); - const count = response.messages?.length ?? 0; - - logger.info("Received message count check result", { - senderEmail, - count, - }); - return count; - } catch (error) { - logger.error("Error counting received messages", { - error, - senderEmail, - }); - return 0; // Default to 0 on error - } - } - - async getAttachment( - messageId: string, - attachmentId: string, - ): Promise<{ data: string; size: number }> { - const attachment = await getOutlookAttachment( - this.client, - messageId, - attachmentId, - ); - - // Outlook attachments return the data directly, not base64 encoded - // We need to convert it to base64 for consistency with Gmail - const data = attachment.contentBytes - ? Buffer.from(attachment.contentBytes, "base64").toString("base64") - : ""; - - return { - data, - size: attachment.size || 0, - }; - } - - async getThreadsWithQuery(options: { - query?: ThreadsQuery; - maxResults?: number; - pageToken?: string; - }): Promise<{ - threads: EmailThread[]; - nextPageToken?: string; - }> { - const query = options.query; - const client = this.client.getClient(); - - // Build the filter query for Microsoft Graph API - function getFilter() { - const filters: string[] = []; - - // Add folder filter based on type or labelId - if (query?.labelId) { - // Use labelId as parentFolderId (should be lowercase for Outlook) - filters.push(`parentFolderId eq '${query.labelId.toLowerCase()}'`); - } else if (query?.type === "all") { - // For "all" type, include both inbox and archive - filters.push( - "(parentFolderId eq 'inbox' or parentFolderId eq 'archive')", - ); - } else { - // Default to inbox only - filters.push("parentFolderId eq 'inbox'"); - } - - // Add other filters - if (query?.fromEmail) { - // Escape single quotes in email address - const escapedEmail = query.fromEmail.replace(/'/g, "''"); - filters.push(`from/emailAddress/address eq '${escapedEmail}'`); - } - - if (query?.q) { - // Escape single quotes in search query - const escapedQuery = query.q.replace(/'/g, "''"); - filters.push( - `(contains(subject,'${escapedQuery}') or contains(bodyPreview,'${escapedQuery}'))`, - ); - } - - return filters.length > 0 ? filters.join(" and ") : undefined; - } - - // Get messages from Microsoft Graph API - const endpoint = "/me/messages"; - - // Build the request - let request = client - .api(endpoint) - .select( - "id,conversationId,conversationIndex,subject,bodyPreview,from,toRecipients,receivedDateTime,isDraft,body,categories,parentFolderId", - ) - .top(options.maxResults || 50); - - // Add filter if present - const filter = getFilter(); - if (filter) { - request = request.filter(filter); - } - - // Only add ordering if we don't have a fromEmail filter to avoid complexity - if (!query?.fromEmail) { - request = request.orderby("receivedDateTime DESC"); - } - - // Handle pagination - if (options.pageToken) { - request = request.skipToken(options.pageToken); - } - - const response = await request.get(); - - // Sort messages by receivedDateTime if we filtered by fromEmail (since we couldn't use orderby) - let sortedMessages = response.value; - if (query?.fromEmail) { - sortedMessages = response.value.sort( - (a: { receivedDateTime: string }, b: { receivedDateTime: string }) => - new Date(b.receivedDateTime).getTime() - - new Date(a.receivedDateTime).getTime(), - ); - } - - // Group messages by conversationId to create threads - const messagesByThread = new Map< - string, - { - conversationId: string; - conversationIndex?: string; - id: string; - bodyPreview: string; - body: { content: string }; - from: { emailAddress: { address: string } }; - toRecipients: { emailAddress: { address: string } }[]; - receivedDateTime: string; - subject: string; - }[] - >(); - sortedMessages.forEach( - (message: { - conversationId: string; - id: string; - bodyPreview: string; - body: { content: string }; - from: { emailAddress: { address: string } }; - toRecipients: { emailAddress: { address: string } }[]; - receivedDateTime: string; - subject: string; - }) => { - // Skip messages without conversationId - if (!message.conversationId) { - logger.warn("Message missing conversationId", { - messageId: message.id, - }); - return; - } - - const messages = messagesByThread.get(message.conversationId) || []; - messages.push(message); - messagesByThread.set(message.conversationId, messages); - }, - ); - - // Convert to EmailThread format - const threads: EmailThread[] = Array.from(messagesByThread.entries()) - .filter(([_threadId, messages]) => messages.length > 0) // Filter out empty threads - .map(([threadId, messages]) => { - // Convert messages to ParsedMessage format - const parsedMessages: ParsedMessage[] = messages.map((message) => { - const subject = message.subject || ""; - const date = message.receivedDateTime || new Date().toISOString(); - - // Add proper null checks for from and toRecipients - const fromAddress = message.from?.emailAddress?.address || ""; - const toAddress = - message.toRecipients?.[0]?.emailAddress?.address || ""; - - return { - id: message.id || "", - threadId: message.conversationId || "", - snippet: message.bodyPreview || "", - textPlain: message.body?.content || "", - textHtml: message.body?.content || "", - headers: { - from: fromAddress, - to: toAddress, - subject, - date, - }, - subject, - date, - labelIds: [], - internalDate: date, - historyId: "", - inline: [], - conversationIndex: message.conversationIndex, - }; - }); - - return { - id: threadId, - messages: parsedMessages, - snippet: messages[0]?.bodyPreview || "", - }; - }); - - return { - threads, - nextPageToken: response["@odata.nextLink"] - ? new URL(response["@odata.nextLink"]).searchParams.get("$skiptoken") || - undefined - : undefined, - }; - } - - async hasPreviousCommunicationsWithSenderOrDomain(options: { - from: string; - date: Date; - messageId: string; - }): Promise { - try { - const response = await this.client - .getClient() - .api("/me/messages") - .filter( - `from/emailAddress/address eq '${options.from}' and receivedDateTime lt ${options.date.toISOString()}`, - ) - .top(2) - .select("id") - .get(); - - // Check if there are any messages from this sender before the current date - // and exclude the current message - const hasPreviousEmail = response.value.some( - (message: { id: string }) => message.id !== options.messageId, - ); - - return hasPreviousEmail; - } catch (error) { - logger.error("Error checking previous communications", { - error, - options, - }); - return false; - } - } - - async getThreadsFromSenderWithSubject( - sender: string, - limit: number, - ): Promise> { - return getOutlookThreadsFromSenderWithSubject(this.client, sender, limit); - } - - async getAwaitingReplyLabel(): Promise { - const [awaitingReplyLabel] = await getOutlookOrCreateLabels({ - client: this.client, - names: [AWAITING_REPLY_LABEL_NAME], - }); - - return awaitingReplyLabel.id || ""; - } - - async getReplyTrackingLabels(): Promise<{ - awaitingReplyLabelId: string; - needsReplyLabelId: string; - }> { - const [awaitingReplyLabel, needsReplyLabel] = - await getOutlookOrCreateLabels({ - client: this.client, - names: [AWAITING_REPLY_LABEL_NAME, NEEDS_REPLY_LABEL_NAME], - }); - - return { - awaitingReplyLabelId: awaitingReplyLabel.id || "", - needsReplyLabelId: needsReplyLabel.id || "", - }; - } - - async processHistory(options: { - emailAddress: string; - historyId?: number; - startHistoryId?: number; - subscriptionId?: string; - resourceData?: { - id: string; - conversationId?: string; - }; - }): Promise { - if (!options.subscriptionId) { - throw new Error( - "subscriptionId is required for Outlook history processing", - ); - } - - await processOutlookHistory({ - subscriptionId: options.subscriptionId, - resourceData: options.resourceData || { - id: options.historyId?.toString() || "0", - conversationId: options.startHistoryId?.toString(), - }, - }); - } - - async watchEmails(): Promise<{ - expirationDate: Date; - subscriptionId?: string; - } | null> { - const subscription = await watchOutlook(this.client.getClient()); - - if (subscription.expirationDateTime) { - const expirationDate = new Date(subscription.expirationDateTime); - return { - expirationDate, - subscriptionId: subscription.id, - }; - } - return null; - } - - async unwatchEmails(subscriptionId?: string): Promise { - if (!subscriptionId) { - logger.warn("No subscription ID provided for Outlook unwatch"); - return; - } - await unwatchOutlook(this.client.getClient(), subscriptionId); - } - - isReplyInThread(message: ParsedMessage): boolean { - try { - return atob(message.conversationIndex || "").length > 22; - } catch (error) { - logger.warn("Invalid conversationIndex base64", { - conversationIndex: message.conversationIndex, - error, - }); - return false; - } - } -} +import { GmailProvider } from "@/utils/email/google"; +import { OutlookProvider } from "@/utils/email/microsoft"; +import type { EmailProvider } from "@/utils/email/types"; export async function createEmailProvider({ emailAccountId, diff --git a/apps/web/utils/email/types.ts b/apps/web/utils/email/types.ts new file mode 100644 index 0000000000..bb4bbb2b1b --- /dev/null +++ b/apps/web/utils/email/types.ts @@ -0,0 +1,160 @@ +import type { ParsedMessage } from "@/utils/types"; +import type { InboxZeroLabel } from "@/utils/label"; +import type { ThreadsQuery } from "@/app/api/threads/validation"; + +export interface EmailThread { + id: string; + messages: ParsedMessage[]; + snippet: string; + historyId?: string; +} + +export interface EmailLabel { + id: string; + name: string; + type: string; + threadsTotal?: number; + color?: { + textColor?: string; + backgroundColor?: string; + }; + labelListVisibility?: string; + messageListVisibility?: string; +} + +export interface EmailFilter { + id: string; + criteria?: { + from?: string; + }; + action?: { + addLabelIds?: string[]; + removeLabelIds?: string[]; + }; +} + +export interface EmailProvider { + readonly name: "google" | "microsoft-entra-id"; + getThreads(folderId?: string): Promise; + getThread(threadId: string): Promise; + getLabels(): Promise; + getLabelById(labelId: string): Promise; + getMessage(messageId: string): Promise; + getMessages(query?: string, maxResults?: number): Promise; + getSentMessages(maxResults?: number): Promise; + getThreadMessages(threadId: string): Promise; + getPreviousConversationMessages( + messageIds: string[], + ): Promise; + archiveThread(threadId: string, ownerEmail: string): Promise; + archiveThreadWithLabel( + threadId: string, + ownerEmail: string, + labelId?: string, + ): Promise; + trashThread( + threadId: string, + ownerEmail: string, + actionSource: "user" | "automation", + ): Promise; + labelMessage(messageId: string, labelName: string): Promise; + removeThreadLabel(threadId: string, labelId: string): Promise; + getAwaitingReplyLabel(): Promise; + draftEmail( + email: ParsedMessage, + args: { to?: string; subject?: string; content: string }, + executedRule?: { id: string; threadId: string; emailAccountId: string }, + ): Promise<{ draftId: string }>; + replyToEmail(email: ParsedMessage, content: string): Promise; + sendEmail(args: { + to: string; + cc?: string; + bcc?: string; + subject: string; + messageText: string; + }): Promise; + forwardEmail( + email: ParsedMessage, + args: { to: string; cc?: string; bcc?: string; content?: string }, + ): Promise; + markSpam(threadId: string): Promise; + markRead(threadId: string): Promise; + markReadThread(threadId: string, read: boolean): Promise; + getDraft(draftId: string): Promise; + deleteDraft(draftId: string): Promise; + createLabel(name: string, description?: string): Promise; + getOrCreateInboxZeroLabel(key: InboxZeroLabel): Promise; + getOriginalMessage( + originalMessageId: string | undefined, + ): Promise; + getFiltersList(): Promise; + createFilter(options: { + from: string; + addLabelIds?: string[]; + removeLabelIds?: string[]; + }): Promise; + deleteFilter(id: string): Promise; + createAutoArchiveFilter(options: { + from: string; + gmailLabelId?: string; + labelName?: string; + }): Promise; + getMessagesWithPagination(options: { + query?: string; + maxResults?: number; + pageToken?: string; + before?: Date; + after?: Date; + }): Promise<{ + messages: ParsedMessage[]; + nextPageToken?: string; + }>; + getMessagesBatch(messageIds: string[]): Promise; + getAccessToken(): string; + checkIfReplySent(senderEmail: string): Promise; + countReceivedMessages( + senderEmail: string, + threshold: number, + ): Promise; + getAttachment( + messageId: string, + attachmentId: string, + ): Promise<{ data: string; size: number }>; + getThreadsWithQuery(options: { + query?: ThreadsQuery; + maxResults?: number; + pageToken?: string; + }): Promise<{ + threads: EmailThread[]; + nextPageToken?: string; + }>; + hasPreviousCommunicationsWithSenderOrDomain(options: { + from: string; + date: Date; + messageId: string; + }): Promise; + getThreadsFromSenderWithSubject( + sender: string, + limit: number, + ): Promise>; + getReplyTrackingLabels(): Promise<{ + awaitingReplyLabelId: string; + needsReplyLabelId: string; + }>; + processHistory(options: { + emailAddress: string; + historyId?: number; + startHistoryId?: number; + subscriptionId?: string; + resourceData?: { + id: string; + conversationId?: string; + }; + }): Promise; + watchEmails(): Promise<{ + expirationDate: Date; + subscriptionId?: string; + } | null>; + unwatchEmails(subscriptionId?: string): Promise; + isReplyInThread(message: ParsedMessage): boolean; +} diff --git a/apps/web/utils/gmail/message.ts b/apps/web/utils/gmail/message.ts index 71f360dc1a..1ff6c30dd2 100644 --- a/apps/web/utils/gmail/message.ts +++ b/apps/web/utils/gmail/message.ts @@ -15,7 +15,7 @@ import { getAccessTokenFromClient } from "@/utils/gmail/client"; import { GmailLabel } from "@/utils/gmail/label"; import { isIgnoredSender } from "@/utils/filter-ignored-senders"; import parse from "gmail-api-parse-message"; -import type { EmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; const logger = createScopedLogger("gmail/message"); diff --git a/apps/web/utils/middleware.ts b/apps/web/utils/middleware.ts index 0281f78ba9..c90ce4dec6 100644 --- a/apps/web/utils/middleware.ts +++ b/apps/web/utils/middleware.ts @@ -11,10 +11,8 @@ import { NO_REFRESH_TOKEN_ERROR_CODE, } from "@/utils/config"; import prisma from "@/utils/prisma"; -import { - createEmailProvider, - type EmailProvider, -} from "@/utils/email/provider"; +import { createEmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; const logger = createScopedLogger("middleware"); diff --git a/apps/web/utils/reply-tracker/check-sender-reply-history.ts b/apps/web/utils/reply-tracker/check-sender-reply-history.ts index 48445847a5..2673e2c597 100644 --- a/apps/web/utils/reply-tracker/check-sender-reply-history.ts +++ b/apps/web/utils/reply-tracker/check-sender-reply-history.ts @@ -1,6 +1,6 @@ import { extractEmailAddress } from "@/utils/email"; import { createScopedLogger } from "@/utils/logger"; -import type { EmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; const logger = createScopedLogger("reply-tracker/query"); diff --git a/apps/web/utils/reply-tracker/draft-tracking.ts b/apps/web/utils/reply-tracker/draft-tracking.ts index 86d65c3931..d189755fda 100644 --- a/apps/web/utils/reply-tracker/draft-tracking.ts +++ b/apps/web/utils/reply-tracker/draft-tracking.ts @@ -6,7 +6,7 @@ import { createScopedLogger } from "@/utils/logger"; import { calculateSimilarity } from "@/utils/similarity-score"; import { getDraft, deleteDraft } from "@/utils/gmail/draft"; import { formatError } from "@/utils/error"; -import type { EmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; const logger = createScopedLogger("draft-tracking"); diff --git a/apps/web/utils/reply-tracker/generate-draft.ts b/apps/web/utils/reply-tracker/generate-draft.ts index 9823910922..87c0629434 100644 --- a/apps/web/utils/reply-tracker/generate-draft.ts +++ b/apps/web/utils/reply-tracker/generate-draft.ts @@ -10,7 +10,7 @@ import prisma from "@/utils/prisma"; import { aiExtractRelevantKnowledge } from "@/utils/ai/knowledge/extract"; import { stringifyEmail } from "@/utils/stringify-email"; import { aiExtractFromEmailHistory } from "@/utils/ai/knowledge/extract-from-email-history"; -import type { EmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; import { aiCollectReplyContext } from "@/utils/ai/reply/reply-context-collector"; const logger = createScopedLogger("generate-reply"); @@ -111,6 +111,7 @@ async function generateDraftContent( if (!lastMessage) throw new Error("No message provided"); + // Check Redis cache for reply const reply = await getReply({ emailAccountId: emailAccount.id, messageId: lastMessage.id, diff --git a/apps/web/utils/reply-tracker/inbound.ts b/apps/web/utils/reply-tracker/inbound.ts index db8d748602..9ae26c3221 100644 --- a/apps/web/utils/reply-tracker/inbound.ts +++ b/apps/web/utils/reply-tracker/inbound.ts @@ -6,7 +6,7 @@ import type { ParsedMessage } from "@/utils/types"; import { internalDateToDate } from "@/utils/date"; import { getEmailForLLM } from "@/utils/get-email-from-message"; import { aiChooseRule } from "@/utils/ai/choose-rule/ai-choose-rule"; -import type { EmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; /** * Marks an email thread as needing a reply. diff --git a/apps/web/utils/reply-tracker/outbound.ts b/apps/web/utils/reply-tracker/outbound.ts index f131500273..f6cb1559b4 100644 --- a/apps/web/utils/reply-tracker/outbound.ts +++ b/apps/web/utils/reply-tracker/outbound.ts @@ -10,7 +10,7 @@ import { getEmailForLLM } from "@/utils/get-email-from-message"; import { getReplyTrackingLabels } from "@/utils/reply-tracker/label"; import { labelMessage, removeThreadLabel } from "@/utils/gmail/label"; import { internalDateToDate } from "@/utils/date"; -import type { EmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; export async function handleOutboundReply({ emailAccount, diff --git a/apps/web/utils/scheduled-actions/executor.ts b/apps/web/utils/scheduled-actions/executor.ts index 9e540139ac..2e385079d7 100644 --- a/apps/web/utils/scheduled-actions/executor.ts +++ b/apps/web/utils/scheduled-actions/executor.ts @@ -8,7 +8,7 @@ import { createScopedLogger } from "@/utils/logger"; import { getEmailAccountWithAiAndTokens } from "@/utils/user/get"; import { runActionFunction } from "@/utils/ai/actions"; import type { ActionItem, EmailForAction } from "@/utils/ai/types"; -import type { EmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; const logger = createScopedLogger("scheduled-actions-executor"); diff --git a/apps/web/utils/user/delete.ts b/apps/web/utils/user/delete.ts index 293ef52ca5..d0d81ee0e4 100644 --- a/apps/web/utils/user/delete.ts +++ b/apps/web/utils/user/delete.ts @@ -6,10 +6,8 @@ import { deleteTinybirdAiCalls } from "@inboxzero/tinybird-ai-analytics"; import { deletePosthogUser, trackUserDeleted } from "@/utils/posthog"; import { captureException } from "@/utils/error"; import { unwatchEmails } from "@/app/api/watch/controller"; -import { - createEmailProvider, - type EmailProvider, -} from "@/utils/email/provider"; +import { createEmailProvider } from "@/utils/email/provider"; +import type { EmailProvider } from "@/utils/email/types"; import { createScopedLogger } from "@/utils/logger"; import { sleep } from "@/utils/sleep"; diff --git a/version.txt b/version.txt index a6316f06bb..d07897c403 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v2.3.0 \ No newline at end of file +v2.3.1 \ No newline at end of file