diff --git a/apps/web/app/api/google/webhook/process-history.test.ts b/apps/web/app/api/google/webhook/process-history.test.ts new file mode 100644 index 0000000000..a256029227 --- /dev/null +++ b/apps/web/app/api/google/webhook/process-history.test.ts @@ -0,0 +1,137 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { processHistoryForUser } from "./process-history"; +import { getHistory } from "@/utils/gmail/history"; +import { + getWebhookEmailAccount, + validateWebhookAccount, +} from "@/utils/webhook/validate-webhook-account"; +import { createScopedLogger } from "@/utils/logger"; +import prisma from "@/utils/prisma"; + +const logger = createScopedLogger("test"); +// Mock logger.with to return the same logger instance so spies work +vi.spyOn(logger, "with").mockReturnValue(logger); + +vi.mock("server-only", () => ({})); + +vi.mock("@/utils/gmail/client", () => ({ + getGmailClientWithRefresh: vi.fn().mockResolvedValue({}), +})); + +vi.mock("@/utils/gmail/history", () => ({ + getHistory: vi.fn(), +})); + +vi.mock("@/utils/webhook/validate-webhook-account", () => ({ + getWebhookEmailAccount: vi.fn(), + validateWebhookAccount: vi.fn(), +})); + +vi.mock("@/utils/prisma", () => ({ + default: { + emailAccount: { + update: vi.fn().mockResolvedValue({}), + }, + }, +})); + +vi.mock("@/utils/error", () => ({ + captureException: vi.fn(), +})); + +describe("processHistoryForUser - 404 Handling", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("should reset lastSyncedHistoryId when Gmail returns 404 (expired historyId)", async () => { + const email = "user@test.com"; + const historyId = 2000; + const emailAccount = { + id: "account-123", + email, + lastSyncedHistoryId: "1000", + }; + + vi.mocked(getWebhookEmailAccount).mockResolvedValue(emailAccount as any); + vi.mocked(validateWebhookAccount).mockResolvedValue({ + success: true, + data: { + emailAccount: { + ...emailAccount, + account: { + access_token: "token", + refresh_token: "refresh", + expires_at: new Date(Date.now() + 3_600_000), + }, + rules: [], + }, + hasAutomationRules: false, + hasAiAccess: false, + }, + } as any); + + // Simulate Gmail 404 error + const error404 = new Error("Requested entity was not found"); + (error404 as any).status = 404; + vi.mocked(getHistory).mockRejectedValue(error404); + + const result = await processHistoryForUser( + { emailAddress: email, historyId }, + {}, + logger, + ); + + const jsonResponse = await (result as any).json(); + expect(jsonResponse).toEqual({ ok: true }); + + // Verify lastSyncedHistoryId was updated to the current historyId + expect(prisma.emailAccount.update).toHaveBeenCalledWith({ + where: { id: "account-123" }, + data: { lastSyncedHistoryId: "2000" }, + }); + }); + + it("should log a warning when history items are skipped due to large gap", async () => { + const email = "user@test.com"; + const historyId = 2000; // Gap of 1000 (2000 - 1000) + const emailAccount = { + id: "account-123", + email, + lastSyncedHistoryId: "1000", + }; + + vi.mocked(getWebhookEmailAccount).mockResolvedValue(emailAccount as any); + vi.mocked(validateWebhookAccount).mockResolvedValue({ + success: true, + data: { + emailAccount: { + ...emailAccount, + account: { + access_token: "token", + refresh_token: "refresh", + expires_at: new Date(Date.now() + 3_600_000), + }, + rules: [], + }, + hasAutomationRules: false, + hasAiAccess: false, + }, + } as any); + + vi.mocked(getHistory).mockResolvedValue({ history: [] }); + + const warnSpy = vi.spyOn(logger, "warn"); + + await processHistoryForUser({ emailAddress: email, historyId }, {}, logger); + + expect(warnSpy).toHaveBeenCalledWith( + "Skipping history items due to large gap", + expect.objectContaining({ + lastSyncedHistoryId: 1000, + webhookHistoryId: 2000, + skippedHistoryItems: 500, // (2000 - 500) - 1000 = 500 + }), + ); + }); +}); diff --git a/apps/web/app/api/google/webhook/process-history.ts b/apps/web/app/api/google/webhook/process-history.ts index cbcd4d9042..38cde7854f 100644 --- a/apps/web/app/api/google/webhook/process-history.ts +++ b/apps/web/app/api/google/webhook/process-history.ts @@ -13,9 +13,11 @@ import { getHistory } from "@/utils/gmail/history"; import { validateWebhookAccount, getWebhookEmailAccount, + type ValidatedWebhookAccountData, } from "@/utils/webhook/validate-webhook-account"; import prisma from "@/utils/prisma"; import type { Logger } from "@/utils/logger"; +import type { gmail_v1 } from "@googleapis/gmail"; export async function processHistoryForUser( decodedData: { @@ -76,29 +78,28 @@ export async function processHistoryForUser( logger, }); - const startHistoryId = - options?.startHistoryId || - Math.max( - Number.parseInt(emailAccount?.lastSyncedHistoryId || "0"), - historyId - 500, // avoid going too far back - ).toString(); - - logger.info("Listing history", { - startHistoryId, - lastSyncedHistoryId: emailAccount?.lastSyncedHistoryId, - gmailHistoryId: startHistoryId, + const historyResult = await fetchGmailHistoryResilient({ + gmail, + emailAccount, + webhookHistoryId: historyId, + options, + logger, }); - const history = await getHistory(gmail, { - // NOTE this can cause problems if we're way behind - // NOTE this doesn't include startHistoryId in the results - startHistoryId, - historyTypes: ["messageAdded", "labelAdded", "labelRemoved"], - maxResults: 500, - }); + if (historyResult.status === "expired") { + await updateLastSyncedHistoryId({ + emailAccountId: validatedEmailAccount.id, + lastSyncedHistoryId: historyId.toString(), + }); + return NextResponse.json({ ok: true }); + } + + const history = historyResult.data; if (history.history) { - logger.info("Processing history", { startHistoryId }); + logger.info("Processing history", { + startHistoryId: historyResult.startHistoryId, + }); await processHistory( { @@ -118,7 +119,9 @@ export async function processHistoryForUser( logger, ); } else { - logger.info("No history", { startHistoryId }); + logger.info("No history", { + startHistoryId: historyResult.startHistoryId, + }); // important to save this or we can get into a loop with never receiving history await updateLastSyncedHistoryId({ @@ -250,3 +253,90 @@ const isInboxOrSentMessage = (message: { return false; }; + +function isHistoryIdExpiredError(error: unknown): boolean { + // biome-ignore lint/suspicious/noExplicitAny: simple + const err = error as any; + const statusCode = + err.response?.data?.error?.code ?? + err.response?.status ?? + err.status ?? + err.code; + + return statusCode === 404; +} + +/** + * Fetches history from Gmail with resilience: + * 1. Limits how far back we go to avoid processing massive gaps (e.g. if a user is disconnected for months). + * 2. Handles expired history IDs (404s) by resetting the sync point. + */ +async function fetchGmailHistoryResilient({ + gmail, + emailAccount, + webhookHistoryId, + options, + logger, +}: { + gmail: gmail_v1.Gmail; + emailAccount: ValidatedWebhookAccountData; + webhookHistoryId: number; + options: { startHistoryId?: string }; + logger: Logger; +}): Promise< + | { + status: "success"; + data: Awaited>; + startHistoryId: string; + } + | { status: "expired" } +> { + const lastSyncedHistoryId = Number.parseInt( + emailAccount?.lastSyncedHistoryId || "0", + ); + + // If the gap is too large (e.g. > 500 items), we start from currentHistoryId - 500. + // This prevents timeouts and runaway processing costs if the system falls way behind. + const startHistoryIdNum = Math.max( + lastSyncedHistoryId, + webhookHistoryId - 500, + ); + const startHistoryId = + options?.startHistoryId || startHistoryIdNum.toString(); + + // Log if we are intentionally skipping emails to keep the system stable + if (startHistoryIdNum > lastSyncedHistoryId && !options?.startHistoryId) { + logger.warn("Skipping history items due to large gap", { + lastSyncedHistoryId, + webhookHistoryId, + effectiveStartHistoryId: startHistoryIdNum, + skippedHistoryItems: startHistoryIdNum - lastSyncedHistoryId, + }); + } + + logger.info("Listing history", { + startHistoryId, + lastSyncedHistoryId: emailAccount?.lastSyncedHistoryId, + gmailHistoryId: startHistoryId, + }); + + try { + const data = await getHistory(gmail, { + startHistoryId, + historyTypes: ["messageAdded", "labelAdded", "labelRemoved"], + maxResults: 500, + }); + return { status: "success", data, startHistoryId }; + } catch (error) { + // Gmail history IDs are typically valid for ~1 week. If older, Gmail returns a 404. + // In this case, we reset the sync point to the current history ID. + if (isHistoryIdExpiredError(error)) { + logger.warn("HistoryId expired, resetting to current", { + expiredHistoryId: startHistoryId, + newHistoryId: webhookHistoryId, + }); + return { status: "expired" }; + } + throw error; + } +}