Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions apps/web/app/api/google/webhook/process-history.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { processHistoryForUser } from "./process-history";
import { getHistory } from "@/utils/gmail/history";
import {
getWebhookEmailAccount,
validateWebhookAccount,
} from "@/utils/webhook/validate-webhook-account";
import { createScopedLogger } from "@/utils/logger";
import prisma from "@/utils/prisma";

const logger = createScopedLogger("test");
// Mock logger.with to return the same logger instance so spies work
vi.spyOn(logger, "with").mockReturnValue(logger);

vi.mock("server-only", () => ({}));

vi.mock("@/utils/gmail/client", () => ({
getGmailClientWithRefresh: vi.fn().mockResolvedValue({}),
}));

vi.mock("@/utils/gmail/history", () => ({
getHistory: vi.fn(),
}));

vi.mock("@/utils/webhook/validate-webhook-account", () => ({
getWebhookEmailAccount: vi.fn(),
validateWebhookAccount: vi.fn(),
}));

vi.mock("@/utils/prisma", () => ({
default: {
emailAccount: {
update: vi.fn().mockResolvedValue({}),
},
},
}));

vi.mock("@/utils/error", () => ({
captureException: vi.fn(),
}));

describe("processHistoryForUser - 404 Handling", () => {
beforeEach(() => {
vi.clearAllMocks();
});

it("should reset lastSyncedHistoryId when Gmail returns 404 (expired historyId)", async () => {
const email = "user@test.com";
const historyId = 2000;
const emailAccount = {
id: "account-123",
email,
lastSyncedHistoryId: "1000",
};

vi.mocked(getWebhookEmailAccount).mockResolvedValue(emailAccount as any);
vi.mocked(validateWebhookAccount).mockResolvedValue({
success: true,
data: {
emailAccount: {
...emailAccount,
account: {
access_token: "token",
refresh_token: "refresh",
expires_at: new Date(Date.now() + 3_600_000),
},
rules: [],
},
hasAutomationRules: false,
hasAiAccess: false,
},
} as any);

// Simulate Gmail 404 error
const error404 = new Error("Requested entity was not found");
(error404 as any).status = 404;
vi.mocked(getHistory).mockRejectedValue(error404);

const result = await processHistoryForUser(
{ emailAddress: email, historyId },
{},
logger,
);

const jsonResponse = await (result as any).json();
expect(jsonResponse).toEqual({ ok: true });

// Verify lastSyncedHistoryId was updated to the current historyId
expect(prisma.emailAccount.update).toHaveBeenCalledWith({
where: { id: "account-123" },
data: { lastSyncedHistoryId: "2000" },
});
});

it("should log a warning when history items are skipped due to large gap", async () => {
const email = "user@test.com";
const historyId = 2000; // Gap of 1000 (2000 - 1000)
const emailAccount = {
id: "account-123",
email,
lastSyncedHistoryId: "1000",
};

vi.mocked(getWebhookEmailAccount).mockResolvedValue(emailAccount as any);
vi.mocked(validateWebhookAccount).mockResolvedValue({
success: true,
data: {
emailAccount: {
...emailAccount,
account: {
access_token: "token",
refresh_token: "refresh",
expires_at: new Date(Date.now() + 3_600_000),
},
rules: [],
},
hasAutomationRules: false,
hasAiAccess: false,
},
} as any);

vi.mocked(getHistory).mockResolvedValue({ history: [] });

const warnSpy = vi.spyOn(logger, "warn");

await processHistoryForUser({ emailAddress: email, historyId }, {}, logger);

expect(warnSpy).toHaveBeenCalledWith(
"Skipping history items due to large gap",
expect.objectContaining({
lastSyncedHistoryId: 1000,
webhookHistoryId: 2000,
skippedHistoryItems: 500, // (2000 - 500) - 1000 = 500
}),
);
});
});
130 changes: 110 additions & 20 deletions apps/web/app/api/google/webhook/process-history.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import { getHistory } from "@/utils/gmail/history";
import {
validateWebhookAccount,
getWebhookEmailAccount,
type ValidatedWebhookAccountData,
} from "@/utils/webhook/validate-webhook-account";
import prisma from "@/utils/prisma";
import type { Logger } from "@/utils/logger";
import type { gmail_v1 } from "@googleapis/gmail";

export async function processHistoryForUser(
decodedData: {
Expand Down Expand Up @@ -76,29 +78,28 @@ export async function processHistoryForUser(
logger,
});

const startHistoryId =
options?.startHistoryId ||
Math.max(
Number.parseInt(emailAccount?.lastSyncedHistoryId || "0"),
historyId - 500, // avoid going too far back
).toString();

logger.info("Listing history", {
startHistoryId,
lastSyncedHistoryId: emailAccount?.lastSyncedHistoryId,
gmailHistoryId: startHistoryId,
const historyResult = await fetchGmailHistoryResilient({
gmail,
emailAccount,
webhookHistoryId: historyId,
options,
logger,
});

const history = await getHistory(gmail, {
// NOTE this can cause problems if we're way behind
// NOTE this doesn't include startHistoryId in the results
startHistoryId,
historyTypes: ["messageAdded", "labelAdded", "labelRemoved"],
maxResults: 500,
});
if (historyResult.status === "expired") {
await updateLastSyncedHistoryId({
emailAccountId: validatedEmailAccount.id,
lastSyncedHistoryId: historyId.toString(),
});
return NextResponse.json({ ok: true });
}

const history = historyResult.data;

if (history.history) {
logger.info("Processing history", { startHistoryId });
logger.info("Processing history", {
startHistoryId: historyResult.startHistoryId,
});

await processHistory(
{
Expand All @@ -118,7 +119,9 @@ export async function processHistoryForUser(
logger,
);
} else {
logger.info("No history", { startHistoryId });
logger.info("No history", {
startHistoryId: historyResult.startHistoryId,
});

// important to save this or we can get into a loop with never receiving history
await updateLastSyncedHistoryId({
Expand Down Expand Up @@ -250,3 +253,90 @@ const isInboxOrSentMessage = (message: {

return false;
};

function isHistoryIdExpiredError(error: unknown): boolean {
// biome-ignore lint/suspicious/noExplicitAny: simple
const err = error as any;
const statusCode =
err.response?.data?.error?.code ??
err.response?.status ??
err.status ??
err.code;

return statusCode === 404;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Normalize numeric inputs from external sources before use—coerce to numbers, check for NaN, and apply safe defaults—to prevent missed expiry detection and NaN propagation.

Suggested change
return statusCode === 404;
return statusCode === 404 || statusCode === "404";

🚀 Want me to fix this? Reply ex: "fix it for me".

}

/**
* Fetches history from Gmail with resilience:
* 1. Limits how far back we go to avoid processing massive gaps (e.g. if a user is disconnected for months).
* 2. Handles expired history IDs (404s) by resetting the sync point.
*/
async function fetchGmailHistoryResilient({
gmail,
emailAccount,
webhookHistoryId,
options,
logger,
}: {
gmail: gmail_v1.Gmail;
emailAccount: ValidatedWebhookAccountData;
webhookHistoryId: number;
options: { startHistoryId?: string };
logger: Logger;
}): Promise<
| {
status: "success";
data: Awaited<ReturnType<typeof getHistory>>;
startHistoryId: string;
}
| { status: "expired" }
> {
const lastSyncedHistoryId = Number.parseInt(
emailAccount?.lastSyncedHistoryId || "0",
);

// If the gap is too large (e.g. > 500 items), we start from currentHistoryId - 500.
// This prevents timeouts and runaway processing costs if the system falls way behind.
const startHistoryIdNum = Math.max(
lastSyncedHistoryId,
webhookHistoryId - 500,
);
const startHistoryId =
options?.startHistoryId || startHistoryIdNum.toString();

// Log if we are intentionally skipping emails to keep the system stable
if (startHistoryIdNum > lastSyncedHistoryId && !options?.startHistoryId) {
logger.warn("Skipping history items due to large gap", {
lastSyncedHistoryId,
webhookHistoryId,
effectiveStartHistoryId: startHistoryIdNum,
skippedHistoryItems: startHistoryIdNum - lastSyncedHistoryId,
});
}

logger.info("Listing history", {
startHistoryId,
lastSyncedHistoryId: emailAccount?.lastSyncedHistoryId,
gmailHistoryId: startHistoryId,
});

try {
const data = await getHistory(gmail, {
startHistoryId,
historyTypes: ["messageAdded", "labelAdded", "labelRemoved"],
maxResults: 500,
});
return { status: "success", data, startHistoryId };
} catch (error) {
// Gmail history IDs are typically valid for ~1 week. If older, Gmail returns a 404.
// In this case, we reset the sync point to the current history ID.
if (isHistoryIdExpiredError(error)) {
logger.warn("HistoryId expired, resetting to current", {
expiredHistoryId: startHistoryId,
newHistoryId: webhookHistoryId,
});
return { status: "expired" };
}
throw error;
}
}
Loading