diff --git a/apps/web/app/api/clean/gmail/simple/route.test.ts b/apps/web/app/api/clean/gmail/simple/route.test.ts new file mode 100644 index 0000000000..85fcc793c9 --- /dev/null +++ b/apps/web/app/api/clean/gmail/simple/route.test.ts @@ -0,0 +1,252 @@ +// Mock server-only as per testing guidelines +vi.mock("server-only", () => ({})); + +// Mock env +vi.mock("@/env", () => ({ + env: { + QSTASH_TOKEN: undefined, + INTERNAL_API_KEY: "test-internal-key", + EMAIL_ENCRYPT_SECRET: "test-secret", + EMAIL_ENCRYPT_SALT: "test-salt", + }, +})); + +// Mock encryption to avoid env issues +vi.mock("@/utils/encryption", () => ({ + encrypt: vi.fn((val) => val), + decrypt: vi.fn((val) => val), +})); + +// Mock auth to avoid encryption chain +vi.mock("@/utils/auth", () => ({})); + +// Mock next/headers +vi.mock("next/headers", () => ({ + headers: vi.fn(() => new Headers()), +})); + +// Mock Redis +vi.mock("@/utils/redis", () => ({ + redis: null, +})); + +// Mock internal-api +vi.mock("@/utils/internal-api", () => ({ + isValidInternalApiKey: vi.fn(), +})); + +// Mock Prisma +vi.mock("@/utils/prisma", () => ({ + default: { + emailAccount: { + findUnique: vi.fn(), + }, + cleanupThread: { + create: vi.fn(), + }, + }, +})); + +// Mock heavy dependencies +vi.mock("@/utils/gmail/client", () => ({ + getGmailClientWithRefresh: vi.fn(), +})); + +vi.mock("@/utils/gmail/label", () => ({ + GmailLabel: { + INBOX: "INBOX", + UNREAD: "UNREAD", + }, + labelThread: vi.fn(), +})); + +vi.mock("@/utils/redis/clean", () => ({ + updateThread: vi.fn(), +})); + +import { NextRequest } from "next/server"; +import { beforeEach, describe, expect, test, vi } from "vitest"; +import { isValidInternalApiKey } from "@/utils/internal-api"; +import { env } from "@/env"; +import prisma from "@/utils/prisma"; +import { getGmailClientWithRefresh } from "@/utils/gmail/client"; +import { labelThread } from "@/utils/gmail/label"; +import { POST } from "./route"; + +const mockIsValidInternalApiKey = vi.mocked(isValidInternalApiKey); + +describe("/api/clean/gmail/simple", () => { + beforeEach(() => { + vi.clearAllMocks(); + // Reset env mock + (env as { QSTASH_TOKEN: string | undefined }).QSTASH_TOKEN = undefined; + }); + + const createMockRequest = (body: Record) => { + return new NextRequest("http://localhost/api/clean/gmail/simple", { + method: "POST", + headers: { + "Content-Type": "application/json", + "x-api-key": "test-api-key", + }, + body: JSON.stringify(body), + }); + }; + + const validBody = { + emailAccountId: "test-email-account-id", + threadId: "test-thread-id", + markDone: true, + action: "ARCHIVE", + markedDoneLabelId: "label-1", + processedLabelId: "label-2", + jobId: "job-1", + }; + + describe("QStash guard", () => { + test("should return 403 when QSTASH_TOKEN is set", async () => { + (env as { QSTASH_TOKEN: string | undefined }).QSTASH_TOKEN = + "qstash-token"; + + const request = createMockRequest(validBody); + const response = await POST(request); + const responseBody = await response.json(); + + expect(response.status).toBe(403); + expect(responseBody.error).toBe( + "Qstash is set. This endpoint is disabled.", + ); + }); + + test("should proceed when QSTASH_TOKEN is not set", async () => { + mockIsValidInternalApiKey.mockReturnValue(false); + + const request = createMockRequest(validBody); + const response = await POST(request); + + // Should get past QStash check and fail on API key + expect(response.status).toBe(401); + }); + }); + + describe("API key validation", () => { + test("should return 401 when API key is invalid", async () => { + mockIsValidInternalApiKey.mockReturnValue(false); + + const request = createMockRequest(validBody); + const response = await POST(request); + const responseBody = await response.json(); + + expect(response.status).toBe(401); + expect(responseBody.error).toBe("Invalid API key"); + }); + + test("should proceed when API key is valid", async () => { + mockIsValidInternalApiKey.mockReturnValue(true); + + // Will fail on next step (missing mocks for performGmailAction dependencies) + // but this proves API key validation passed + const request = createMockRequest(validBody); + const response = await POST(request); + + // Should get past API key check + expect(response.status).not.toBe(401); + expect(mockIsValidInternalApiKey).toHaveBeenCalled(); + }); + }); + + describe("Input validation", () => { + test("should return 400 when body is missing required fields", async () => { + mockIsValidInternalApiKey.mockReturnValue(true); + + const request = createMockRequest({}); + const response = await POST(request); + const responseBody = await response.json(); + + expect(response.status).toBe(400); + expect(responseBody.error).toBeDefined(); + }); + + test("should return 400 when action is invalid", async () => { + mockIsValidInternalApiKey.mockReturnValue(true); + + const request = createMockRequest({ + ...validBody, + action: "INVALID_ACTION", + }); + const response = await POST(request); + const responseBody = await response.json(); + + expect(response.status).toBe(400); + expect(responseBody.error).toBeDefined(); + }); + + test("should return 400 when markDone is not a boolean", async () => { + mockIsValidInternalApiKey.mockReturnValue(true); + + const request = createMockRequest({ + ...validBody, + markDone: "yes", + }); + const response = await POST(request); + const responseBody = await response.json(); + + expect(response.status).toBe(400); + expect(responseBody.error).toBeDefined(); + }); + }); + + describe("Archived flag logic", () => { + const mockAccount = { + account: { + access_token: "test-access-token", + refresh_token: "test-refresh-token", + expires_at: new Date(Date.now() + 3_600_000), + }, + }; + + beforeEach(() => { + mockIsValidInternalApiKey.mockReturnValue(true); + vi.mocked(prisma.emailAccount.findUnique).mockResolvedValue( + mockAccount as never, + ); + vi.mocked(getGmailClientWithRefresh).mockResolvedValue({} as never); + vi.mocked(labelThread).mockResolvedValue(undefined); + vi.mocked(prisma.cleanupThread.create).mockResolvedValue({} as never); + }); + + test("should set archived=true when action is ARCHIVE and markDone=true", async () => { + const request = createMockRequest({ + ...validBody, + action: "ARCHIVE", + markDone: true, + }); + await POST(request); + + expect(prisma.cleanupThread.create).toHaveBeenCalledWith( + expect.objectContaining({ + data: expect.objectContaining({ + archived: true, + }), + }), + ); + }); + + test("should set archived=false when action is MARK_READ and markDone=true", async () => { + const request = createMockRequest({ + ...validBody, + action: "MARK_READ", + markDone: true, + }); + await POST(request); + + expect(prisma.cleanupThread.create).toHaveBeenCalledWith( + expect.objectContaining({ + data: expect.objectContaining({ + archived: false, + }), + }), + ); + }); + }); +}); diff --git a/apps/web/app/api/clean/gmail/simple/route.ts b/apps/web/app/api/clean/gmail/simple/route.ts new file mode 100644 index 0000000000..bc53f4d0e5 --- /dev/null +++ b/apps/web/app/api/clean/gmail/simple/route.ts @@ -0,0 +1,182 @@ +import { NextResponse } from "next/server"; +import { headers } from "next/headers"; +import { z } from "zod"; +import { withError, type RequestWithLogger } from "@/utils/middleware"; +import { getGmailClientWithRefresh } from "@/utils/gmail/client"; +import { GmailLabel, labelThread } from "@/utils/gmail/label"; +import { SafeError } from "@/utils/error"; +import prisma from "@/utils/prisma"; +import { isDefined } from "@/utils/types"; +import type { Logger } from "@/utils/logger"; +import { CleanAction } from "@/generated/prisma/enums"; +import { updateThread } from "@/utils/redis/clean"; +import { env } from "@/env"; +import { isValidInternalApiKey } from "@/utils/internal-api"; + +const cleanGmailSchema = z.object({ + emailAccountId: z.string(), + threadId: z.string(), + markDone: z.boolean(), + action: z.enum([CleanAction.ARCHIVE, CleanAction.MARK_READ]), + markedDoneLabelId: z.string().optional(), + processedLabelId: z.string().optional(), + jobId: z.string(), +}); +type CleanGmailBody = z.infer; + +/** + * Applies Gmail label changes to a thread based on the clean decision. + * Handles archive (remove INBOX) or mark-read (remove UNREAD) actions, + * adds processed/done labels, and persists the result. + */ +async function performGmailAction({ + emailAccountId, + threadId, + markDone, + markedDoneLabelId, + processedLabelId, + jobId, + action, + logger, +}: CleanGmailBody & { logger: Logger }) { + const account = await prisma.emailAccount.findUnique({ + where: { id: emailAccountId }, + select: { + account: { + select: { + access_token: true, + refresh_token: true, + expires_at: true, + }, + }, + }, + }); + + if (!account) throw new SafeError("User not found", 404); + if (!account.account?.access_token || !account.account?.refresh_token) + throw new SafeError("No Gmail account found", 404); + + const gmail = await getGmailClientWithRefresh({ + accessToken: account.account.access_token, + refreshToken: account.account.refresh_token, + expiresAt: account.account.expires_at?.getTime() || null, + emailAccountId, + }); + + const shouldArchive = markDone && action === CleanAction.ARCHIVE; + const shouldMarkAsRead = markDone && action === CleanAction.MARK_READ; + + const addLabelIds = [ + processedLabelId, + markDone ? markedDoneLabelId : undefined, + ].filter(isDefined); + const removeLabelIds = [ + shouldArchive ? GmailLabel.INBOX : undefined, + shouldMarkAsRead ? GmailLabel.UNREAD : undefined, + ].filter(isDefined); + + logger.info("Handling thread", { threadId, shouldArchive, shouldMarkAsRead }); + + await labelThread({ + gmail, + threadId, + addLabelIds, + removeLabelIds, + }); + + await saveCleanResult({ + emailAccountId, + threadId, + archived: shouldArchive, + jobId, + }); +} + +/** + * Saves the clean result to both Redis (for real-time status) and the database. + */ +async function saveCleanResult({ + emailAccountId, + threadId, + archived, + jobId, +}: { + emailAccountId: string; + threadId: string; + archived: boolean; + jobId: string; +}) { + await Promise.all([ + updateThread({ + emailAccountId, + jobId, + threadId, + update: { status: "completed" }, + }), + saveToDatabase({ + emailAccountId, + threadId, + archive: archived, + jobId, + }), + ]); +} + +/** + * Persists the cleanup thread record to the database for history tracking. + */ +async function saveToDatabase({ + emailAccountId, + threadId, + archive, + jobId, +}: { + emailAccountId: string; + threadId: string; + archive: boolean; + jobId: string; +}) { + await prisma.cleanupThread.create({ + data: { + emailAccount: { connect: { id: emailAccountId } }, + threadId, + archived: archive, + job: { connect: { id: jobId } }, + }, + }); +} + +// Alternative endpoint for self-hosted deployments without Qstash. +// Disabled when QSTASH_TOKEN is set (returns 403). +// Authenticates via internal API key instead of Qstash signature verification. +// +// Security note: The internal API key provides blanket access to all email accounts. +// This mirrors the QStash routes which trust signed requests without per-account validation. +// The internal API key is a trusted service credential for self-hosted deployments. +export const POST = withError( + "clean/gmail/simple", + async (request: Request) => { + if (env.QSTASH_TOKEN) { + return NextResponse.json( + { error: "Qstash is set. This endpoint is disabled." }, + { status: 403 }, + ); + } + + const requestLogger = (request as RequestWithLogger).logger; + + if (!isValidInternalApiKey(await headers(), requestLogger)) { + return NextResponse.json({ error: "Invalid API key" }, { status: 401 }); + } + + const json = await request.json(); + const body = cleanGmailSchema.parse(json); + + await performGmailAction({ + ...body, + logger: requestLogger, + }); + + return NextResponse.json({ success: true }); + }, +); diff --git a/apps/web/app/api/clean/simple/route.test.ts b/apps/web/app/api/clean/simple/route.test.ts new file mode 100644 index 0000000000..1a7172effd --- /dev/null +++ b/apps/web/app/api/clean/simple/route.test.ts @@ -0,0 +1,192 @@ +// Mock server-only as per testing guidelines +vi.mock("server-only", () => ({})); + +// Mock env +vi.mock("@/env", () => ({ + env: { + QSTASH_TOKEN: undefined, + INTERNAL_API_KEY: "test-internal-key", + EMAIL_ENCRYPT_SECRET: "test-secret", + EMAIL_ENCRYPT_SALT: "test-salt", + }, +})); + +// Mock encryption to avoid env issues +vi.mock("@/utils/encryption", () => ({ + encrypt: vi.fn((val) => val), + decrypt: vi.fn((val) => val), +})); + +// Mock auth to avoid encryption chain +vi.mock("@/utils/auth", () => ({})); + +// Mock next/headers +vi.mock("next/headers", () => ({ + headers: vi.fn(() => new Headers()), +})); + +// Mock Redis +vi.mock("@/utils/redis", () => ({ + redis: null, +})); + +// Mock internal-api +vi.mock("@/utils/internal-api", () => ({ + isValidInternalApiKey: vi.fn(), +})); + +// Mock Prisma +vi.mock("@/utils/prisma", () => ({ + default: {}, +})); + +// Mock heavy dependencies to avoid loading the entire app +vi.mock("@/utils/gmail/thread", () => ({ + getThreadMessages: vi.fn(), +})); + +vi.mock("@/utils/gmail/client", () => ({ + getGmailClientWithRefresh: vi.fn(), +})); + +vi.mock("@/utils/user/get", () => ({ + getEmailAccountWithAiAndTokens: vi.fn(), + getUserPremium: vi.fn(), +})); + +vi.mock("@/utils/ai/clean/ai-clean", () => ({ + aiClean: vi.fn(), +})); + +vi.mock("@/utils/redis/clean", () => ({ + saveThread: vi.fn(), + updateThread: vi.fn(), +})); + +vi.mock("@/utils/upstash", () => ({ + publishToQstash: vi.fn(), +})); + +vi.mock("@/utils/premium", () => ({ + isActivePremium: vi.fn(), +})); + +import { NextRequest } from "next/server"; +import { beforeEach, describe, expect, test, vi } from "vitest"; +import { isValidInternalApiKey } from "@/utils/internal-api"; +import { env } from "@/env"; +import { POST } from "./route"; + +const mockIsValidInternalApiKey = vi.mocked(isValidInternalApiKey); + +describe("/api/clean/simple", () => { + beforeEach(() => { + vi.clearAllMocks(); + // Reset env mock + (env as { QSTASH_TOKEN: string | undefined }).QSTASH_TOKEN = undefined; + }); + + const createMockRequest = (body: Record) => { + return new NextRequest("http://localhost/api/clean/simple", { + method: "POST", + headers: { + "Content-Type": "application/json", + "x-api-key": "test-api-key", + }, + body: JSON.stringify(body), + }); + }; + + const validBody = { + emailAccountId: "test-email-account-id", + threadId: "test-thread-id", + markedDoneLabelId: "label-1", + processedLabelId: "label-2", + jobId: "job-1", + action: "ARCHIVE", + skips: { + reply: true, + starred: true, + calendar: true, + }, + }; + + describe("QStash guard", () => { + test("should return 403 when QSTASH_TOKEN is set", async () => { + (env as { QSTASH_TOKEN: string | undefined }).QSTASH_TOKEN = + "qstash-token"; + + const request = createMockRequest(validBody); + const response = await POST(request); + const responseBody = await response.json(); + + expect(response.status).toBe(403); + expect(responseBody.error).toBe( + "Qstash is set. This endpoint is disabled.", + ); + }); + + test("should proceed when QSTASH_TOKEN is not set", async () => { + mockIsValidInternalApiKey.mockReturnValue(false); + + const request = createMockRequest(validBody); + const response = await POST(request); + + // Should get past QStash check and fail on API key + expect(response.status).toBe(401); + }); + }); + + describe("API key validation", () => { + test("should return 401 when API key is invalid", async () => { + mockIsValidInternalApiKey.mockReturnValue(false); + + const request = createMockRequest(validBody); + const response = await POST(request); + const responseBody = await response.json(); + + expect(response.status).toBe(401); + expect(responseBody.error).toBe("Invalid API key"); + }); + + test("should proceed when API key is valid", async () => { + mockIsValidInternalApiKey.mockReturnValue(true); + + // Will fail on next step (missing mocks for cleanThread dependencies) + // but this proves API key validation passed + const request = createMockRequest(validBody); + const response = await POST(request); + + // Should get past API key check + expect(response.status).not.toBe(401); + expect(mockIsValidInternalApiKey).toHaveBeenCalled(); + }); + }); + + describe("Input validation", () => { + test("should return 400 when body is missing required fields", async () => { + mockIsValidInternalApiKey.mockReturnValue(true); + + const request = createMockRequest({}); + const response = await POST(request); + const responseBody = await response.json(); + + expect(response.status).toBe(400); + expect(responseBody.error).toBeDefined(); + }); + + test("should return 400 when action is invalid", async () => { + mockIsValidInternalApiKey.mockReturnValue(true); + + const request = createMockRequest({ + ...validBody, + action: "INVALID_ACTION", + }); + const response = await POST(request); + const responseBody = await response.json(); + + expect(response.status).toBe(400); + expect(responseBody.error).toBeDefined(); + }); + }); +}); diff --git a/apps/web/app/api/clean/simple/route.ts b/apps/web/app/api/clean/simple/route.ts new file mode 100644 index 0000000000..33b3da6af7 --- /dev/null +++ b/apps/web/app/api/clean/simple/route.ts @@ -0,0 +1,337 @@ +import { z } from "zod"; +import { NextResponse } from "next/server"; +import { headers } from "next/headers"; +import { withError, type RequestWithLogger } from "@/utils/middleware"; +import { publishToQstash } from "@/utils/upstash"; +import { getThreadMessages } from "@/utils/gmail/thread"; +import { getGmailClientWithRefresh } from "@/utils/gmail/client"; +import { SafeError } from "@/utils/error"; +import type { Logger } from "@/utils/logger"; +import { aiClean } from "@/utils/ai/clean/ai-clean"; +import { getEmailForLLM } from "@/utils/get-email-from-message"; +import { + getEmailAccountWithAiAndTokens, + getUserPremium, +} from "@/utils/user/get"; +import { findUnsubscribeLink } from "@/utils/parse/parseHtml.server"; +import { getCalendarEventStatus } from "@/utils/parse/calender-event"; +import { GmailLabel } from "@/utils/gmail/label"; +import { isNewsletterSender } from "@/utils/ai/group/find-newsletters"; +import { isMaybeReceipt, isReceipt } from "@/utils/ai/group/find-receipts"; +import { saveThread, updateThread } from "@/utils/redis/clean"; +import { internalDateToDate } from "@/utils/date"; +import { CleanAction } from "@/generated/prisma/enums"; +import type { ParsedMessage } from "@/utils/types"; +import { isActivePremium } from "@/utils/premium"; +import { env } from "@/env"; +import { isValidInternalApiKey } from "@/utils/internal-api"; + +const cleanThreadBody = z.object({ + emailAccountId: z.string(), + threadId: z.string(), + markedDoneLabelId: z.string(), + processedLabelId: z.string(), + jobId: z.string(), + action: z.enum([CleanAction.ARCHIVE, CleanAction.MARK_READ]), + instructions: z.string().optional(), + skips: z.object({ + reply: z.boolean().default(true).nullish(), + starred: z.boolean().default(true).nullish(), + calendar: z.boolean().default(true).nullish(), + receipt: z.boolean().default(false).nullish(), + attachment: z.boolean().default(false).nullish(), + conversation: z.boolean().default(false).nullish(), + }), +}); +type CleanThreadBody = z.infer; + +type CleanGmailBody = { + emailAccountId: string; + threadId: string; + markDone: boolean; + action: CleanAction; + markedDoneLabelId?: string; + processedLabelId?: string; + jobId: string; +}; + +/** + * Processes a Gmail thread for the Deep Clean feature. + * Applies static rules (starred, attachments, receipts, calendar, newsletters) + * before falling back to LLM-based decision making. + */ +async function cleanThread({ + emailAccountId, + threadId, + markedDoneLabelId, + processedLabelId, + jobId, + action, + instructions, + skips, + logger, +}: CleanThreadBody & { logger: Logger }) { + const emailAccount = await getEmailAccountWithAiAndTokens({ + emailAccountId, + }); + + if (!emailAccount) throw new SafeError("User not found", 404); + + if (!emailAccount.tokens) throw new SafeError("No Gmail account found", 404); + if (!emailAccount.tokens.access_token || !emailAccount.tokens.refresh_token) + throw new SafeError("No Gmail account found", 404); + + const premium = await getUserPremium({ userId: emailAccount.userId }); + if (!isActivePremium(premium)) throw new SafeError("User not premium"); + + const gmail = await getGmailClientWithRefresh({ + accessToken: emailAccount.tokens.access_token, + refreshToken: emailAccount.tokens.refresh_token, + expiresAt: emailAccount.tokens.expires_at, + emailAccountId, + }); + + const messages = await getThreadMessages(threadId, gmail); + + logger.info("Fetched messages", { + emailAccountId, + threadId, + messageCount: messages.length, + }); + + const lastMessage = messages[messages.length - 1]; + if (!lastMessage) return; + + await saveThread({ + emailAccountId, + thread: { + threadId, + jobId, + subject: lastMessage.headers.subject, + from: lastMessage.headers.from, + snippet: lastMessage.snippet, + date: internalDateToDate(lastMessage.internalDate), + }, + }); + + const publish = getPublish({ + emailAccountId, + threadId, + markedDoneLabelId, + processedLabelId, + jobId, + action, + logger, + }); + + function isStarred(message: ParsedMessage) { + return message.labelIds?.includes(GmailLabel.STARRED); + } + + function isSent(message: ParsedMessage) { + return message.labelIds?.includes(GmailLabel.SENT); + } + + function hasAttachments(message: ParsedMessage) { + return message.attachments && message.attachments.length > 0; + } + + function hasUnsubscribeLink(message: ParsedMessage) { + return ( + findUnsubscribeLink(message.textHtml) || + message.headers["list-unsubscribe"] + ); + } + + function hasSentMail(message: ParsedMessage) { + return message.labelIds?.includes(GmailLabel.SENT); + } + + let needsLLMCheck = false; + + // Run through static rules before running against our LLM + for (const message of messages) { + // Skip if message is starred and skipStarred is true + if (skips.starred && isStarred(message)) { + await publish({ markDone: false }); + return; + } + + // Skip conversations + if (skips.conversation && isSent(message)) { + await publish({ markDone: false }); + return; + } + + // Skip if message has attachments and skipAttachment is true + if (skips.attachment && hasAttachments(message)) { + await publish({ markDone: false }); + return; + } + + // receipt + if (skips.receipt) { + if (isReceipt(message)) { + await publish({ markDone: false }); + return; + } + + if (isMaybeReceipt(message)) { + // check with llm + needsLLMCheck = true; + break; + } + } + + // calendar invite + const calendarEventStatus = getCalendarEventStatus(message); + if (skips.calendar && calendarEventStatus.isEvent) { + if (calendarEventStatus.timing === "past") { + await publish({ markDone: true }); + return; + } + + if (calendarEventStatus.timing === "future") { + await publish({ markDone: false }); + return; + } + } + + // unsubscribe link + if (!hasSentMail(message) && hasUnsubscribeLink(message)) { + await publish({ markDone: true }); + return; + } + + // newsletter + if (!hasSentMail(message) && isNewsletterSender(message.headers.from)) { + await publish({ markDone: true }); + return; + } + } + + // promotion/social/update + if ( + !needsLLMCheck && + lastMessage.labelIds?.some( + (label) => + label === GmailLabel.SOCIAL || + label === GmailLabel.PROMOTIONS || + label === GmailLabel.UPDATES || + label === GmailLabel.FORUMS, + ) + ) { + await publish({ markDone: true }); + return; + } + + // llm check + const aiResult = await aiClean({ + emailAccount, + messageId: lastMessage.id, + messages: messages.map((m) => getEmailForLLM(m)), + instructions, + skips, + }); + + await publish({ markDone: aiResult.archive }); +} + +/** + * Creates a publish function that enqueues Gmail label changes. + * Uses publishToQstash which has built-in fallback to /simple routes + * when QStash is not configured. + */ +function getPublish({ + emailAccountId, + threadId, + markedDoneLabelId, + processedLabelId, + jobId, + action, + logger, +}: { + emailAccountId: string; + threadId: string; + markedDoneLabelId: string; + processedLabelId: string; + jobId: string; + action: CleanAction; + logger: Logger; +}) { + return async ({ markDone }: { markDone: boolean }) => { + // Gmail API rate limits: https://developers.google.com/gmail/api/reference/quota + // 15,000 quota units per user per minute, modify thread = 10 units + // => ~25 modify threads/second max, with headroom => 12/second + // Each clean operation does 2 label modifications (remove inbox + add processed) + const actionCount = 2; + const maxRatePerSecond = Math.ceil(12 / actionCount); + + const cleanGmailBody: CleanGmailBody = { + emailAccountId, + threadId, + markDone, + action, + markedDoneLabelId, + processedLabelId, + jobId, + }; + + logger.info("Publishing to Qstash", { + emailAccountId, + threadId, + maxRatePerSecond, + markDone, + }); + + await Promise.all([ + publishToQstash("/api/clean/gmail", cleanGmailBody, { + key: `gmail-action-${emailAccountId}`, + ratePerSecond: maxRatePerSecond, + }), + updateThread({ + emailAccountId, + jobId, + threadId, + update: { + archive: markDone, + status: "applying", + }, + }), + ]); + + logger.info("Published to Qstash", { emailAccountId, threadId }); + }; +} + +// Alternative endpoint for self-hosted deployments without Qstash. +// Disabled when QSTASH_TOKEN is set (returns 403). +// Authenticates via internal API key instead of Qstash signature verification. +// +// Security note: The internal API key provides blanket access to all email accounts. +// This mirrors the QStash routes which trust signed requests without per-account validation. +// The internal API key is a trusted service credential for self-hosted deployments. +export const POST = withError("clean/simple", async (request: Request) => { + if (env.QSTASH_TOKEN) { + return NextResponse.json( + { error: "Qstash is set. This endpoint is disabled." }, + { status: 403 }, + ); + } + + const requestLogger = (request as RequestWithLogger).logger; + + if (!isValidInternalApiKey(await headers(), requestLogger)) { + return NextResponse.json({ error: "Invalid API key" }, { status: 401 }); + } + + const json = await request.json(); + const body = cleanThreadBody.parse(json); + + await cleanThread({ + ...body, + logger: requestLogger, + }); + + return NextResponse.json({ success: true }); +}); diff --git a/apps/web/package.json b/apps/web/package.json index 3bed85d67f..4773d8c78f 100644 --- a/apps/web/package.json +++ b/apps/web/package.json @@ -90,6 +90,7 @@ "@tiptap/starter-kit": "2.26.1", "@tiptap/suggestion": "2.26.1", "@upstash/qstash": "2.8.4", + "@upstash/ratelimit": "2.0.7", "@upstash/redis": "1.35.7", "@vercel/analytics": "1.6.1", "@vercel/speed-insights": "1.3.1", diff --git a/apps/web/utils/redis/rate-limit.test.ts b/apps/web/utils/redis/rate-limit.test.ts new file mode 100644 index 0000000000..da978f4789 --- /dev/null +++ b/apps/web/utils/redis/rate-limit.test.ts @@ -0,0 +1,84 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; + +// Use vi.hoisted() to declare mocks that can be used in vi.mock factories +const { mockBlockUntilReady } = vi.hoisted(() => ({ + mockBlockUntilReady: vi.fn(), +})); + +// Mock @upstash/ratelimit before importing the module +vi.mock("@upstash/ratelimit", () => { + // Create a proper class mock + class RatelimitMock { + static slidingWindow = vi.fn(); + blockUntilReady = mockBlockUntilReady; + } + return { Ratelimit: RatelimitMock }; +}); + +// Mock redis +vi.mock("@/utils/redis", () => ({ + redis: {}, +})); + +import { acquireRateLimitToken } from "./rate-limit"; + +describe("acquireRateLimitToken", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("should successfully acquire token when rate limit allows", async () => { + mockBlockUntilReady.mockResolvedValue({ success: true }); + + await expect( + acquireRateLimitToken("test-account-id"), + ).resolves.toBeUndefined(); + + expect(mockBlockUntilReady).toHaveBeenCalledWith( + "test-account-id", + 30_000, // default timeout + ); + }); + + it("should use custom timeout when provided", async () => { + mockBlockUntilReady.mockResolvedValue({ success: true }); + + await acquireRateLimitToken("test-account-id", 60_000); + + expect(mockBlockUntilReady).toHaveBeenCalledWith("test-account-id", 60_000); + }); + + it("should throw error when rate limit timeout is exceeded", async () => { + mockBlockUntilReady.mockResolvedValue({ success: false }); + + await expect(acquireRateLimitToken("test-account-id")).rejects.toThrow( + "Rate limit timeout after 30000ms for test-account-id", + ); + }); + + it("should throw error with custom timeout in message", async () => { + mockBlockUntilReady.mockResolvedValue({ success: false }); + + await expect( + acquireRateLimitToken("test-account-id", 5000), + ).rejects.toThrow("Rate limit timeout after 5000ms for test-account-id"); + }); + + it("should use unique identifier for per-account rate limiting", async () => { + mockBlockUntilReady.mockResolvedValue({ success: true }); + + await acquireRateLimitToken("account-123"); + await acquireRateLimitToken("account-456"); + + expect(mockBlockUntilReady).toHaveBeenNthCalledWith( + 1, + "account-123", + 30_000, + ); + expect(mockBlockUntilReady).toHaveBeenNthCalledWith( + 2, + "account-456", + 30_000, + ); + }); +}); diff --git a/apps/web/utils/redis/rate-limit.ts b/apps/web/utils/redis/rate-limit.ts new file mode 100644 index 0000000000..6bb93ed837 --- /dev/null +++ b/apps/web/utils/redis/rate-limit.ts @@ -0,0 +1,34 @@ +import { Ratelimit } from "@upstash/ratelimit"; +import { redis } from "@/utils/redis"; + +// Create rate limiter with sliding window algorithm. +// 6 requests per 1 second - used as fallback when QStash is unavailable +// to prevent overwhelming Gmail API limits. +const gmailRateLimiter = new Ratelimit({ + redis, + limiter: Ratelimit.slidingWindow(6, "1 s"), + prefix: "ratelimit:gmail", +}); + +/** + * Acquires a rate limit token, blocking until available. + * Uses atomic Redis operations via Upstash Ratelimit to prevent race conditions. + * + * @param identifier - Unique identifier for rate limiting scope (e.g., emailAccountId) + * @param timeout - Maximum time to wait in milliseconds (default: 30000ms / 30 seconds) + * @returns Promise that resolves when token is acquired + * @throws {Error} If rate limit token cannot be acquired within the timeout period + */ +export async function acquireRateLimitToken( + identifier: string, + timeout = 30_000, +): Promise { + const { success } = await gmailRateLimiter.blockUntilReady( + identifier, + timeout, + ); + + if (!success) { + throw new Error(`Rate limit timeout after ${timeout}ms for ${identifier}`); + } +} diff --git a/apps/web/utils/upstash/index.test.ts b/apps/web/utils/upstash/index.test.ts new file mode 100644 index 0000000000..9a9e1b0f11 --- /dev/null +++ b/apps/web/utils/upstash/index.test.ts @@ -0,0 +1,253 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; + +// Use vi.hoisted() to declare mocks that can be used in vi.mock factories +const { mockAcquireRateLimitToken, mockFetch, mockLoggerError } = vi.hoisted( + () => ({ + mockAcquireRateLimitToken: vi.fn(), + mockFetch: vi.fn(), + mockLoggerError: vi.fn(), + }), +); + +// Mock env - must be before imports +vi.mock("@/env", () => ({ + env: { + QSTASH_TOKEN: undefined as string | undefined, + INTERNAL_API_KEY: "test-internal-key", + NEXT_PUBLIC_BASE_URL: "http://localhost:3000", + }, +})); + +// Mock rate limiting +vi.mock("@/utils/redis/rate-limit", () => ({ + acquireRateLimitToken: mockAcquireRateLimitToken, +})); + +// Mock internal-api - use actual header name from implementation +vi.mock("@/utils/internal-api", () => ({ + INTERNAL_API_KEY_HEADER: "x-api-key", + getInternalApiUrl: vi.fn(() => "http://localhost:3000"), +})); + +// Mock logger +vi.mock("@/utils/logger", () => ({ + createScopedLogger: vi.fn(() => ({ + warn: vi.fn(), + info: vi.fn(), + error: mockLoggerError, + })), +})); + +// Mock fetch globally +vi.stubGlobal("fetch", mockFetch); + +import { env } from "@/env"; +import { publishToQstash, bulkPublishToQstash, publishToQstashQueue } from "."; + +describe("QStash fallback behavior", () => { + beforeEach(() => { + vi.clearAllMocks(); + mockAcquireRateLimitToken.mockResolvedValue(undefined); + mockFetch.mockResolvedValue(Response.json({ success: true })); + // Ensure QSTASH_TOKEN is undefined for fallback tests + (env as { QSTASH_TOKEN: string | undefined }).QSTASH_TOKEN = undefined; + }); + + describe("publishToQstash", () => { + it("should use emailAccountId from body for rate limiting", async () => { + const body = { + emailAccountId: "account-123", + threadId: "thread-456", + }; + + await publishToQstash("/api/clean", body); + + expect(mockAcquireRateLimitToken).toHaveBeenCalledWith("account-123"); + }); + + it("should fall back to 'global' when no emailAccountId in body", async () => { + const body = { + someOtherField: "value", + }; + + await publishToQstash("/api/clean", body); + + expect(mockAcquireRateLimitToken).toHaveBeenCalledWith("global"); + }); + + it("should fall back to 'global' when emailAccountId is empty string", async () => { + const body = { + emailAccountId: "", + threadId: "thread-456", + }; + + await publishToQstash("/api/clean", body); + + expect(mockAcquireRateLimitToken).toHaveBeenCalledWith("global"); + }); + + it("should append /simple to the URL for fallback path", async () => { + const body = { emailAccountId: "account-123" }; + + await publishToQstash("/api/clean", body); + + expect(mockFetch).toHaveBeenCalledWith( + "http://localhost:3000/api/clean/simple", + expect.objectContaining({ + method: "POST", + }), + ); + }); + + it("should include internal API key header in fallback request", async () => { + const body = { emailAccountId: "account-123" }; + + await publishToQstash("/api/clean", body); + + expect(mockFetch).toHaveBeenCalledWith( + expect.any(String), + expect.objectContaining({ + headers: expect.objectContaining({ + "Content-Type": "application/json", + "x-api-key": "test-internal-key", + }), + }), + ); + }); + + it("should JSON stringify the body in fallback request", async () => { + const body = { + emailAccountId: "account-123", + threadId: "thread-456", + markDone: true, + }; + + await publishToQstash("/api/clean", body); + + expect(mockFetch).toHaveBeenCalledWith( + expect.any(String), + expect.objectContaining({ + body: JSON.stringify(body), + }), + ); + }); + }); + + describe("error handling", () => { + it("should propagate error when rate limit acquisition fails", async () => { + const error = new Error( + "Rate limit timeout after 30000ms for account-123", + ); + mockAcquireRateLimitToken.mockRejectedValue(error); + + const body = { emailAccountId: "account-123" }; + + await expect(publishToQstash("/api/clean", body)).rejects.toThrow( + "Rate limit timeout after 30000ms for account-123", + ); + + // Verify fetch was NOT called since rate limiting failed + expect(mockFetch).not.toHaveBeenCalled(); + }); + + it("should log error when fetch fails but not throw", async () => { + const fetchError = new Error("Network error"); + mockFetch.mockRejectedValue(fetchError); + + const body = { emailAccountId: "account-123" }; + + // Should not throw - fire-and-forget with error logging + await expect( + publishToQstash("/api/clean", body), + ).resolves.toBeUndefined(); + + // Wait for the catch handler to execute + await new Promise((resolve) => setTimeout(resolve, 10)); + + expect(mockLoggerError).toHaveBeenCalledWith( + "Fallback fetch failed", + expect.objectContaining({ + url: "http://localhost:3000/api/clean/simple", + error: "Network error", + rateLimitKey: "account-123", + }), + ); + }); + }); + + describe("bulkPublishToQstash", () => { + it("should rate limit each item sequentially in fallback mode", async () => { + const items = [ + { + url: "http://localhost:3000/api/clean", + body: { emailAccountId: "account-1" }, + }, + { + url: "http://localhost:3000/api/clean", + body: { emailAccountId: "account-2" }, + }, + { + url: "http://localhost:3000/api/clean", + body: { emailAccountId: "account-3" }, + }, + ]; + + await bulkPublishToQstash({ items }); + + expect(mockAcquireRateLimitToken).toHaveBeenCalledTimes(3); + expect(mockAcquireRateLimitToken).toHaveBeenNthCalledWith(1, "account-1"); + expect(mockAcquireRateLimitToken).toHaveBeenNthCalledWith(2, "account-2"); + expect(mockAcquireRateLimitToken).toHaveBeenNthCalledWith(3, "account-3"); + }); + + it("should stop processing when rate limit fails mid-batch", async () => { + mockAcquireRateLimitToken + .mockResolvedValueOnce(undefined) + .mockRejectedValueOnce(new Error("Rate limit exceeded")) + .mockResolvedValueOnce(undefined); + + const items = [ + { + url: "http://localhost:3000/api/clean", + body: { emailAccountId: "account-1" }, + }, + { + url: "http://localhost:3000/api/clean", + body: { emailAccountId: "account-2" }, + }, + { + url: "http://localhost:3000/api/clean", + body: { emailAccountId: "account-3" }, + }, + ]; + + await expect(bulkPublishToQstash({ items })).rejects.toThrow( + "Rate limit exceeded", + ); + + // Verify only first fetch was called before failure + expect(mockFetch).toHaveBeenCalledTimes(1); + }); + }); + + describe("publishToQstashQueue", () => { + it("should use rate limiting in fallback mode", async () => { + const body = { emailAccountId: "queue-account-123" }; + + await publishToQstashQueue({ + queueName: "test-queue", + parallelism: 1, + url: "http://localhost:3000/api/clean", + body, + }); + + expect(mockAcquireRateLimitToken).toHaveBeenCalledWith( + "queue-account-123", + ); + }); + }); +}); + +// Note: QStash client behavior (when QSTASH_TOKEN is set) is tested by: +// - The route tests which return 403 when QSTASH_TOKEN is set +// - The @upstash/qstash library's own test suite diff --git a/apps/web/utils/upstash/index.ts b/apps/web/utils/upstash/index.ts index 18e9164571..466d4d7810 100644 --- a/apps/web/utils/upstash/index.ts +++ b/apps/web/utils/upstash/index.ts @@ -4,7 +4,7 @@ import { INTERNAL_API_KEY_HEADER, getInternalApiUrl, } from "@/utils/internal-api"; -import { sleep } from "@/utils/sleep"; +import { acquireRateLimitToken } from "@/utils/redis/rate-limit"; import { createScopedLogger } from "@/utils/logger"; const logger = createScopedLogger("upstash"); @@ -80,11 +80,27 @@ export async function publishToQstashQueue({ return fallbackPublishToQstash(url, body); } +/** + * Fallback HTTP publisher for when QStash is unavailable. + * + * Applies rate limiting using emailAccountId from the body to prevent + * overwhelming downstream services. Uses fire-and-forget pattern with + * error logging. + * + * @param url - Target URL (will have `/simple` appended) + * @param body - Request body, optionally containing emailAccountId for rate limiting + */ async function fallbackPublishToQstash(url: string, body: T) { - // Fallback to fetch if Qstash client is not found - logger.warn("Qstash client not found"); + logger.warn("Qstash client not found, using fallback"); - // Don't await. Run in background + // Rate limit at the source to prevent overwhelming downstream services. + // Extract emailAccountId from body if available, otherwise use global key. + // Note: Use || instead of ?? to handle empty strings as falsy + const rateLimitKey = + (body as { emailAccountId?: string }).emailAccountId || "global"; + await acquireRateLimitToken(rateLimitKey); + + // Fire-and-forget with error logging fetch(`${url}/simple`, { method: "POST", headers: { @@ -92,9 +108,13 @@ async function fallbackPublishToQstash(url: string, body: T) { [INTERNAL_API_KEY_HEADER]: env.INTERNAL_API_KEY, }, body: JSON.stringify(body), + }).catch((error) => { + logger.error("Fallback fetch failed", { + url: `${url}/simple`, + error: error instanceof Error ? error.message : String(error), + rateLimitKey, + }); }); - // Wait for 100ms to ensure the request is sent - await sleep(100); } export async function listQueues() { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 2e66492420..d331dfee1c 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -319,6 +319,9 @@ importers: '@upstash/qstash': specifier: 2.8.4 version: 2.8.4 + '@upstash/ratelimit': + specifier: 2.0.7 + version: 2.0.7(@upstash/redis@1.35.7) '@upstash/redis': specifier: 1.35.7 version: 1.35.7 @@ -6228,9 +6231,18 @@ packages: '@ungap/structured-clone@1.3.0': resolution: {integrity: sha512-WmoN8qaIAo7WTYWbAZuG8PYEhn5fkz7dZrqTBZ7dtt//lL2Gwms1IcnQ5yHqjDfX8Ft5j4YzDM23f87zBfDe9g==} + '@upstash/core-analytics@0.0.10': + resolution: {integrity: sha512-7qJHGxpQgQr9/vmeS1PktEwvNAF7TI4iJDi8Pu2CFZ9YUGHZH4fOP5TfYlZ4aVxfopnELiE4BS4FBjyK7V1/xQ==} + engines: {node: '>=16.0.0'} + '@upstash/qstash@2.8.4': resolution: {integrity: sha512-iojHWUlRoC3M2e4XQ1NFEgC+7Orurrm5uIrPn5WilU7LvWQoocyjYBXR0VUalXkeMAmFyk4blF0EOYZY4igdIQ==} + '@upstash/ratelimit@2.0.7': + resolution: {integrity: sha512-qNQW4uBPKVk8c4wFGj2S/vfKKQxXx1taSJoSGBN36FeiVBBKHQgsjPbKUijZ9Xu5FyVK+pfiXWKIsQGyoje8Fw==} + peerDependencies: + '@upstash/redis': ^1.34.3 + '@upstash/redis@1.35.7': resolution: {integrity: sha512-bdCdKhke+kYUjcLLuGWSeQw7OLuWIx3eyKksyToLBAlGIMX9qiII0ptp8E0y7VFE1yuBxBd/3kSzJ8774Q4g+A==} @@ -8435,6 +8447,7 @@ packages: get-random-values-esm@1.0.2: resolution: {integrity: sha512-HMSDTgj1HPFAuZG0FqxzHbYt5JeEGDUeT9r1RLXhS6RZQS8rLRjokgjZ0Pd28CN0lhXlRwfH6eviZqZEJ2kIoA==} + deprecated: use crypto.getRandomValues() instead get-random-values@1.2.2: resolution: {integrity: sha512-lMyPjQyl0cNNdDf2oR+IQ/fM3itDvpoHy45Ymo2r0L1EjazeSl13SfbKZs7KtZ/3MDCeueiaJiuOEfKqRTsSgA==} @@ -19670,12 +19683,21 @@ snapshots: '@ungap/structured-clone@1.3.0': {} + '@upstash/core-analytics@0.0.10': + dependencies: + '@upstash/redis': 1.35.7 + '@upstash/qstash@2.8.4': dependencies: crypto-js: 4.2.0 jose: 5.10.0 neverthrow: 7.2.0 + '@upstash/ratelimit@2.0.7(@upstash/redis@1.35.7)': + dependencies: + '@upstash/core-analytics': 0.0.10 + '@upstash/redis': 1.35.7 + '@upstash/redis@1.35.7': dependencies: uncrypto: 0.1.3