diff --git a/apps/web/app/(landing)/welcome-upgrade/page.tsx b/apps/web/app/(landing)/welcome-upgrade/page.tsx
index d73d7d79ed..c9c7bc1a24 100644
--- a/apps/web/app/(landing)/welcome-upgrade/page.tsx
+++ b/apps/web/app/(landing)/welcome-upgrade/page.tsx
@@ -18,7 +18,7 @@ export default function WelcomeUpgradePage() {
Spend 50% less time on email
- Join 8,000+ users that use Inbox Zero
+ Join 9,000+ users that use Inbox Zero
to be more productive!
diff --git a/apps/web/app/api/ai/cold-email/controller.ts b/apps/web/app/api/ai/cold-email/controller.ts
index 9d25e5a3e8..de3d22b442 100644
--- a/apps/web/app/api/ai/cold-email/controller.ts
+++ b/apps/web/app/api/ai/cold-email/controller.ts
@@ -50,14 +50,14 @@ export async function isColdEmail({
});
if (coldEmail) {
- logger.log(`Already marked as cold email. ${email.from}`);
+ logger.info(`Already marked as cold email. ${email.from}`);
return { isColdEmail: true, reason: "ai-already-labeled" };
}
// otherwise run through ai to see if it's a cold email
const res = await aiIsColdEmail(email, user);
- logger.log(`AI is cold email: ${res.coldEmail}`);
+ logger.info(`AI is cold email: ${res.coldEmail}`);
return {
isColdEmail: !!res.coldEmail,
diff --git a/apps/web/app/api/google/webhook/process-history.ts b/apps/web/app/api/google/webhook/process-history.ts
index d03b84b8f7..05768d5469 100644
--- a/apps/web/app/api/google/webhook/process-history.ts
+++ b/apps/web/app/api/google/webhook/process-history.ts
@@ -22,7 +22,7 @@ import { runColdEmailBlocker } from "@/app/api/ai/cold-email/controller";
import { captureException } from "@/utils/error";
import { runRulesOnMessage } from "@/utils/ai/choose-rule/run-rules";
import { blockUnsubscribedEmails } from "@/app/api/google/webhook/block-unsubscribed-emails";
-import { categorizeSender } from "@/utils/actions/categorize";
+import { categorizeSender } from "@/utils/categorize/senders/categorize";
import { unwatchEmails } from "@/app/api/google/watch/controller";
import { createScopedLogger } from "@/utils/logger";
import { markMessageAsProcessing } from "@/utils/redis/message-processing";
@@ -83,7 +83,7 @@ export async function processHistoryForUser(
: undefined;
if (!premium) {
- logger.log(`Account not premium. email: ${email}`);
+ logger.info(`Account not premium. email: ${email}`);
await unwatchEmails(account);
return NextResponse.json({ ok: true });
}
@@ -147,7 +147,7 @@ export async function processHistoryForUser(
historyId - 500, // avoid going too far back
).toString();
- logger.log(
+ logger.info(
`Listing history... Start: ${startHistoryId} lastSyncedHistoryId: ${account.user.lastSyncedHistoryId} gmailHistoryId: ${startHistoryId} email: ${email}`,
);
@@ -162,7 +162,7 @@ export async function processHistoryForUser(
});
if (history.data.history) {
- logger.log(
+ logger.info(
`Processing... email: ${email} startHistoryId: ${startHistoryId} historyId: ${history.data.historyId}`,
);
@@ -188,7 +188,7 @@ export async function processHistoryForUser(
},
});
} else {
- logger.log(
+ logger.info(
`No history. startHistoryId: ${startHistoryId}. ${JSON.stringify(decodedData)}`,
);
@@ -199,7 +199,7 @@ export async function processHistoryForUser(
});
}
- logger.log(`Completed. ${JSON.stringify(decodedData)}`);
+ logger.info(`Completed. ${JSON.stringify(decodedData)}`);
return NextResponse.json({ ok: true });
} catch (error) {
@@ -300,13 +300,13 @@ async function processHistoryItem(
});
if (!isFree) {
- logger.log(
+ logger.info(
`Skipping. Message already being processed. email: ${user.email} messageId: ${messageId}`,
);
return;
}
- logger.log(
+ logger.info(
`Getting message... email: ${user.email} messageId: ${messageId} threadId: ${threadId}`,
);
@@ -323,7 +323,7 @@ async function processHistoryItem(
// if the rule has already been executed, skip
if (hasExistingRule) {
- logger.log("Skipping. Rule already exists.");
+ logger.info("Skipping. Rule already exists.");
return;
}
@@ -340,7 +340,7 @@ async function processHistoryItem(
});
if (blocked) {
- logger.log(
+ logger.info(
`Skipping. Blocked unsubscribed email. email: ${user.email} messageId: ${messageId} threadId: ${threadId}`,
);
return;
@@ -355,7 +355,7 @@ async function processHistoryItem(
);
if (shouldRunBlocker) {
- logger.log("Running cold email blocker...");
+ logger.info("Running cold email blocker...");
const hasPreviousEmail = await hasPreviousEmailsFromSenderOrDomain(
gmail,
@@ -395,12 +395,12 @@ async function processHistoryItem(
select: { category: true },
});
if (!existingSender?.category) {
- await categorizeSender(sender, user, gmail, accessToken);
+ await categorizeSender(sender, user, gmail);
}
}
if (hasAutomationRules && hasAiAutomationAccess) {
- logger.log("Running rules...");
+ logger.info("Running rules...");
await runRulesOnMessage({
gmail,
@@ -413,7 +413,7 @@ async function processHistoryItem(
} catch (error: any) {
// gmail bug or snoozed email: https://stackoverflow.com/questions/65290987/gmail-api-getmessage-method-returns-404-for-message-gotten-from-listhistory-meth
if (error.message === "Requested entity was not found.") {
- logger.log(
+ logger.info(
`Message not found. email: ${user.email} messageId: ${messageId} threadId: ${threadId}`,
);
return;
diff --git a/apps/web/app/api/user/categorize/senders/batch/handle-batch-validation.ts b/apps/web/app/api/user/categorize/senders/batch/handle-batch-validation.ts
new file mode 100644
index 0000000000..a6cf7f81b3
--- /dev/null
+++ b/apps/web/app/api/user/categorize/senders/batch/handle-batch-validation.ts
@@ -0,0 +1,7 @@
+import { z } from "zod";
+
+export const aiCategorizeSendersSchema = z.object({
+ userId: z.string(),
+ senders: z.array(z.string()),
+});
+export type AiCategorizeSenders = z.infer
;
diff --git a/apps/web/app/api/user/categorize/senders/batch/handle-batch.ts b/apps/web/app/api/user/categorize/senders/batch/handle-batch.ts
new file mode 100644
index 0000000000..fedadf489f
--- /dev/null
+++ b/apps/web/app/api/user/categorize/senders/batch/handle-batch.ts
@@ -0,0 +1,138 @@
+import { NextResponse } from "next/server";
+import { aiCategorizeSendersSchema } from "@/app/api/user/categorize/senders/batch/handle-batch-validation";
+import { getThreadsFromSender } from "@/utils/gmail/thread";
+import {
+ categorizeWithAi,
+ getCategories,
+ updateSenderCategory,
+} from "@/utils/categorize/senders/categorize";
+import { isDefined } from "@/utils/types";
+import { isActionError } from "@/utils/error";
+import { validateUserAndAiAccess } from "@/utils/user/validate";
+import { getGmailClientWithRefresh } from "@/utils/gmail/client";
+import { UNKNOWN_CATEGORY } from "@/utils/ai/categorize-sender/ai-categorize-senders";
+import { createScopedLogger } from "@/utils/logger";
+import prisma from "@/utils/prisma";
+import { saveCategorizationProgress } from "@/utils/redis/categorization-progress";
+
+const logger = createScopedLogger("api/user/categorize/senders/batch");
+
+export async function handleBatchRequest(
+ request: Request,
+): Promise {
+ try {
+ const handleBatchResult = await handleBatchInternal(request);
+ if (isActionError(handleBatchResult))
+ return NextResponse.json({ error: handleBatchResult.error });
+ return NextResponse.json({ ok: true });
+ } catch (error) {
+ logger.error("handleBatchRequest", { error });
+ return NextResponse.json(
+ { error: "Internal server error" },
+ { status: 500 },
+ );
+ }
+}
+
+async function handleBatchInternal(request: Request) {
+ const json = await request.json();
+ const body = aiCategorizeSendersSchema.parse(json);
+ const { userId, senders } = body;
+
+ logger.trace(`handleBatch ${userId}: ${senders.length} senders`);
+
+ const userResult = await validateUserAndAiAccess(userId);
+ if (isActionError(userResult)) return userResult;
+ const { user } = userResult;
+
+ const categoriesResult = await getCategories(userId);
+ if (isActionError(categoriesResult)) return categoriesResult;
+ const { categories } = categoriesResult;
+
+ const account = await prisma.account.findFirst({
+ where: { user: { id: userId }, provider: "google" },
+ select: {
+ access_token: true,
+ refresh_token: true,
+ expires_at: true,
+ providerAccountId: true,
+ },
+ });
+
+ if (!account) return { error: "No account found" };
+ if (!account.access_token || !account.refresh_token)
+ return { error: "No access or refresh token" };
+
+ const gmail = await getGmailClientWithRefresh(
+ {
+ accessToken: account.access_token,
+ refreshToken: account.refresh_token,
+ expiryDate: account.expires_at,
+ },
+ account.providerAccountId,
+ );
+ if (!gmail) return { error: "No Gmail client" };
+
+ const sendersWithSnippets: Map = new Map();
+
+ // 1. fetch 3 messages for each sender
+ for (const sender of senders) {
+ const threadsFromSender = await getThreadsFromSender(gmail, sender, 3);
+ const snippets = threadsFromSender.map((t) => t.snippet).filter(isDefined);
+ sendersWithSnippets.set(sender, snippets);
+ }
+
+ // 2. categorize senders with ai
+ const results = await categorizeWithAi({
+ user,
+ sendersWithSnippets,
+ categories,
+ });
+
+ // 3. save categorized senders to db
+ for (const result of results) {
+ await updateSenderCategory({
+ sender: result.sender,
+ categories,
+ categoryName: result.category ?? UNKNOWN_CATEGORY,
+ userId,
+ });
+ }
+
+ // // 4. categorize senders that were not categorized
+ // const uncategorizedSenders = results.filter(isUncategorized);
+
+ // await saveCategorizationProgress({
+ // userId,
+ // incrementCompleted: senders.length - uncategorizedSenders.length,
+ // });
+
+ // for (const sender of uncategorizedSenders) {
+ // try {
+ // await categorizeSender(sender.sender, user, gmail, categories);
+ // } catch (error) {
+ // logger.error("Error categorizing sender", {
+ // sender: sender.sender,
+ // error,
+ // });
+ // captureException(error);
+ // }
+
+ // await saveCategorizationProgress({
+ // userId,
+ // incrementCompleted: 1,
+ // });
+ // }
+
+ await saveCategorizationProgress({
+ userId,
+ incrementCompleted: senders.length,
+ });
+
+ return NextResponse.json({ ok: true });
+}
+
+// const isUncategorized = (r: { category?: string }) =>
+// !r.category ||
+// r.category === UNKNOWN_CATEGORY ||
+// r.category === REQUEST_MORE_INFORMATION_CATEGORY;
diff --git a/apps/web/app/api/user/categorize/senders/batch/route.ts b/apps/web/app/api/user/categorize/senders/batch/route.ts
new file mode 100644
index 0000000000..63ad2beafa
--- /dev/null
+++ b/apps/web/app/api/user/categorize/senders/batch/route.ts
@@ -0,0 +1,5 @@
+import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";
+import { withError } from "@/utils/middleware";
+import { handleBatchRequest } from "@/app/api/user/categorize/senders/batch/handle-batch";
+
+export const POST = withError(verifySignatureAppRouter(handleBatchRequest));
diff --git a/apps/web/app/api/user/categorize/senders/batch/simple/route.ts b/apps/web/app/api/user/categorize/senders/batch/simple/route.ts
new file mode 100644
index 0000000000..fc345885db
--- /dev/null
+++ b/apps/web/app/api/user/categorize/senders/batch/simple/route.ts
@@ -0,0 +1,20 @@
+import { NextResponse } from "next/server";
+import { headers } from "next/headers";
+import { withError } from "@/utils/middleware";
+import { handleBatchRequest } from "@/app/api/user/categorize/senders/batch/handle-batch";
+import { env } from "@/env";
+import { isValidInternalApiKey } from "@/utils/internal-api";
+
+// Fallback when Qstash is not in use
+export const POST = withError(async (request) => {
+ if (env.QSTASH_TOKEN) {
+ return NextResponse.json({
+ error: "Qstash is set. This endpoint is disabled.",
+ });
+ }
+
+ if (!isValidInternalApiKey(headers()))
+ return NextResponse.json({ error: "Invalid API key" });
+
+ return handleBatchRequest(request);
+});
diff --git a/apps/web/app/api/user/categorize/senders/find-senders.ts b/apps/web/app/api/user/categorize/senders/find-senders.ts
deleted file mode 100644
index 289824d6a5..0000000000
--- a/apps/web/app/api/user/categorize/senders/find-senders.ts
+++ /dev/null
@@ -1,78 +0,0 @@
-import type { gmail_v1 } from "@googleapis/gmail";
-import { getMessagesBatch } from "@/utils/gmail/message";
-import { getThreadsWithNextPageToken } from "@/utils/gmail/thread";
-import { isDefined } from "@/utils/types";
-import type { SenderMap } from "@/app/api/user/categorize/senders/types";
-
-export async function findSendersWithPagination(
- gmail: gmail_v1.Gmail,
- accessToken: string,
- maxPages: number,
-) {
- const allSenders: SenderMap = new Map();
- let nextPageToken: string | undefined = undefined;
- let currentPage = 0;
-
- while (currentPage < maxPages) {
- const { senders, nextPageToken: newNextPageToken } = await findSenders(
- gmail,
- accessToken,
- nextPageToken,
- );
-
- for (const [sender, messages] of Object.entries(senders)) {
- const existingMessages = allSenders.get(sender) ?? [];
- allSenders.set(sender, [...existingMessages, ...messages]);
- }
-
- if (!newNextPageToken) break; // No more pages
-
- nextPageToken = newNextPageToken;
- currentPage++;
- }
-
- return { senders: allSenders, nextPageToken };
-}
-
-export async function findSenders(
- gmail: gmail_v1.Gmail,
- accessToken: string,
- pageToken?: string,
- maxResults = 50,
-) {
- const senders: SenderMap = new Map();
-
- const { threads, nextPageToken } = await getThreadsWithNextPageToken({
- q: "-in:sent",
- gmail,
- maxResults,
- pageToken,
- });
-
- const messageIds = threads.map((t) => t.id).filter(isDefined);
- const messages = await getMessagesBatch(messageIds, accessToken);
-
- for (const message of messages) {
- const sender = message.headers.from;
- if (sender) {
- const existingMessages = senders.get(sender) ?? [];
- senders.set(sender, [...existingMessages, message]);
- }
- }
-
- return { senders, nextPageToken };
-}
-
-function isNotFoundError(error: unknown): boolean {
- return (
- typeof error === "object" &&
- error !== null &&
- "errors" in error &&
- Array.isArray((error as any).errors) &&
- (error as any).errors.some(
- (e: any) =>
- e.message === "Requested entity was not found." &&
- e.reason === "notFound",
- )
- );
-}
diff --git a/apps/web/app/api/user/categorize/senders/progress/route.ts b/apps/web/app/api/user/categorize/senders/progress/route.ts
new file mode 100644
index 0000000000..9ad94fb887
--- /dev/null
+++ b/apps/web/app/api/user/categorize/senders/progress/route.ts
@@ -0,0 +1,21 @@
+import { NextResponse } from "next/server";
+import { auth } from "@/app/api/auth/[...nextauth]/auth";
+import { getCategorizationProgress } from "@/utils/redis/categorization-progress";
+import { withError } from "@/utils/middleware";
+
+export type CategorizeProgress = Awaited<
+ ReturnType
+>;
+
+async function getCategorizeProgress(userId: string) {
+ const progress = await getCategorizationProgress({ userId });
+ return progress;
+}
+
+export const GET = withError(async () => {
+ const session = await auth();
+ if (!session?.user) return NextResponse.json({ error: "Not authenticated" });
+
+ const result = await getCategorizeProgress(session.user.id);
+ return NextResponse.json(result);
+});
diff --git a/apps/web/app/api/user/categorize/senders/uncategorized/get-uncategorized-senders.ts b/apps/web/app/api/user/categorize/senders/uncategorized/get-uncategorized-senders.ts
new file mode 100644
index 0000000000..c029e4f2a1
--- /dev/null
+++ b/apps/web/app/api/user/categorize/senders/uncategorized/get-uncategorized-senders.ts
@@ -0,0 +1,55 @@
+import { getSenders } from "@inboxzero/tinybird";
+import prisma from "@/utils/prisma";
+
+const MAX_ITERATIONS = 200;
+
+export async function getUncategorizedSenders({
+ email,
+ userId,
+ offset = 0,
+ limit = 100,
+}: {
+ email: string;
+ userId: string;
+ offset?: number;
+ limit?: number;
+}) {
+ let uncategorizedSenders: string[] = [];
+ let currentOffset = offset;
+
+ while (uncategorizedSenders.length === 0 && currentOffset < MAX_ITERATIONS) {
+ const result = await getSenders({
+ ownerEmail: email,
+ limit,
+ offset: currentOffset,
+ });
+ const allSenders = result.data.map((sender) => sender.from);
+
+ const existingSenders = await prisma.newsletter.findMany({
+ where: {
+ email: { in: allSenders },
+ userId,
+ category: { isNot: null },
+ },
+ select: { email: true },
+ });
+
+ const existingSenderEmails = new Set(existingSenders.map((s) => s.email));
+
+ uncategorizedSenders = allSenders.filter(
+ (email) => !existingSenderEmails.has(email),
+ );
+
+ // Break the loop if no more senders are available
+ if (allSenders.length < limit) {
+ return { uncategorizedSenders };
+ }
+
+ currentOffset += limit;
+ }
+
+ return {
+ uncategorizedSenders,
+ nextOffset: currentOffset, // Only return nextOffset if there might be more
+ };
+}
diff --git a/apps/web/app/api/user/categorize/senders/uncategorized/route.ts b/apps/web/app/api/user/categorize/senders/uncategorized/route.ts
index 4881d474fe..e56d849c6a 100644
--- a/apps/web/app/api/user/categorize/senders/uncategorized/route.ts
+++ b/apps/web/app/api/user/categorize/senders/uncategorized/route.ts
@@ -1,72 +1,19 @@
import { NextResponse } from "next/server";
import { withError } from "@/utils/middleware";
import { getSessionAndGmailClient } from "@/utils/actions/helpers";
-import { getSenders } from "@inboxzero/tinybird";
-import prisma from "@/utils/prisma";
+import { isActionError } from "@/utils/error";
+import { getUncategorizedSenders } from "@/app/api/user/categorize/senders/uncategorized/get-uncategorized-senders";
export type UncategorizedSendersResponse = {
uncategorizedSenders: string[];
nextOffset?: number;
};
-async function getUncategorizedSenders({
- email,
- userId,
- offset = 0,
- limit = 100,
-}: {
- email: string;
- userId: string;
- offset?: number;
- limit?: number;
-}) {
- let uncategorizedSenders: string[] = [];
- let currentOffset = offset;
-
- while (uncategorizedSenders.length === 0) {
- const result = await getSenders({
- ownerEmail: email,
- limit,
- offset: currentOffset,
- });
- const allSenders = result.data.map((sender) => sender.from);
-
- const existingSenders = await prisma.newsletter.findMany({
- where: {
- email: { in: allSenders },
- userId,
- category: { isNot: null },
- },
- select: { email: true },
- });
-
- const existingSenderEmails = new Set(existingSenders.map((s) => s.email));
-
- uncategorizedSenders = allSenders.filter(
- (email) => !existingSenderEmails.has(email),
- );
-
- // Break the loop if no more senders are available
- if (allSenders.length < limit) {
- return { uncategorizedSenders };
- }
-
- currentOffset += limit;
- }
-
- return {
- uncategorizedSenders,
- nextOffset: currentOffset, // Only return nextOffset if there might be more
- };
-}
-
export const GET = withError(async (request: Request) => {
- const { gmail, user, error, session } = await getSessionAndGmailClient();
- if (!user?.email) return NextResponse.json({ error: "Not authenticated" });
- if (error) return NextResponse.json({ error });
- if (!gmail) return NextResponse.json({ error: "Could not load Gmail" });
- if (!session?.accessToken)
- return NextResponse.json({ error: "No access token" });
+ const sessionResult = await getSessionAndGmailClient();
+ if (isActionError(sessionResult))
+ return NextResponse.json({ error: sessionResult.error });
+ const { user } = sessionResult;
const url = new URL(request.url);
const offset = Number.parseInt(url.searchParams.get("offset") || "0");
diff --git a/apps/web/components/ProgressPanel.tsx b/apps/web/components/ProgressPanel.tsx
new file mode 100644
index 0000000000..dc0e733daf
--- /dev/null
+++ b/apps/web/components/ProgressPanel.tsx
@@ -0,0 +1,57 @@
+"use client";
+
+import { AnimatePresence, motion } from "framer-motion";
+import { ProgressBar } from "@tremor/react";
+import { cn } from "@/utils";
+
+export function ProgressPanel({
+ totalItems,
+ remainingItems,
+ inProgressText,
+ completedText,
+ itemLabel,
+}: {
+ totalItems: number;
+ remainingItems: number;
+ inProgressText: string;
+ completedText: string;
+ itemLabel: string;
+}) {
+ const totalProcessed = totalItems - remainingItems;
+ const progress = (totalProcessed / totalItems) * 100;
+ const isCompleted = progress === 100;
+
+ if (!totalItems) return null;
+
+ return (
+
+
+
+
+
+
+ {isCompleted ? completedText : inProgressText}
+
+
+ {totalProcessed} of {totalItems} {itemLabel} processed
+
+
+
+
+
+ );
+}
diff --git a/apps/web/components/email-list/EmailList.tsx b/apps/web/components/email-list/EmailList.tsx
index 78135f432a..0054f4ad6e 100644
--- a/apps/web/components/email-list/EmailList.tsx
+++ b/apps/web/components/email-list/EmailList.tsx
@@ -28,7 +28,7 @@ import {
} from "@/components/ui/resizable";
import { runAiRules } from "@/utils/queue/email-actions";
import { selectedEmailAtom } from "@/store/email";
-import { categorizeEmailAction } from "@/utils/actions/categorize";
+import { categorizeEmailAction } from "@/utils/actions/categorize-email";
import { Button } from "@/components/ui/button";
import { ButtonLoader } from "@/components/Loading";
import {
diff --git a/apps/web/env.ts b/apps/web/env.ts
index 8a5d712bf0..e7733de4b7 100644
--- a/apps/web/env.ts
+++ b/apps/web/env.ts
@@ -7,6 +7,7 @@ export const env = createEnv({
NODE_ENV: z.enum(["development", "production", "test"]),
DATABASE_URL: z.string().url(),
NEXTAUTH_SECRET: z.string().min(1),
+ NEXTAUTH_URL: z.string().optional(),
GOOGLE_CLIENT_ID: z.string().min(1),
GOOGLE_CLIENT_SECRET: z.string().min(1),
OPENAI_API_KEY: z.string().min(1),
@@ -16,6 +17,9 @@ export const env = createEnv({
BEDROCK_REGION: z.string().default("us-west-2"),
UPSTASH_REDIS_URL: z.string().min(1),
UPSTASH_REDIS_TOKEN: z.string().min(1),
+ QSTASH_TOKEN: z.string().optional(),
+ QSTASH_CURRENT_SIGNING_KEY: z.string().optional(),
+ QSTASH_NEXT_SIGNING_KEY: z.string().optional(),
GOOGLE_PUBSUB_TOPIC_NAME: z.string().min(1),
GOOGLE_PUBSUB_VERIFICATION_TOKEN: z.string().optional(),
SENTRY_AUTH_TOKEN: z.string().optional(),
@@ -41,6 +45,8 @@ export const env = createEnv({
.string()
.optional()
.transform((value) => value?.split(",")),
+ WEBHOOK_URL: z.string().optional(),
+ INTERNAL_API_KEY: z.string().optional(),
// license
LICENSE_1_SEAT_VARIANT_ID: z.coerce.number().optional(),
diff --git a/apps/web/next.config.mjs b/apps/web/next.config.mjs
index def9d36f37..35146e1bc1 100644
--- a/apps/web/next.config.mjs
+++ b/apps/web/next.config.mjs
@@ -53,7 +53,7 @@ const nextConfig = {
return [
{
source: "/",
- destination: "/bulk-unsubscribe",
+ destination: "/automation",
has: [
{
type: "cookie",
@@ -64,7 +64,7 @@ const nextConfig = {
},
{
source: "/",
- destination: "/bulk-unsubscribe",
+ destination: "/automation",
has: [
{
type: "cookie",
@@ -75,7 +75,7 @@ const nextConfig = {
},
{
source: "/",
- destination: "/bulk-unsubscribe",
+ destination: "/automation",
has: [
{
type: "cookie",
@@ -86,7 +86,7 @@ const nextConfig = {
},
{
source: "/",
- destination: "/bulk-unsubscribe",
+ destination: "/automation",
has: [
{
type: "cookie",
diff --git a/apps/web/package.json b/apps/web/package.json
index 9cb174e351..ff4642fb12 100644
--- a/apps/web/package.json
+++ b/apps/web/package.json
@@ -63,6 +63,7 @@
"@tanstack/react-query": "^5.59.16",
"@tanstack/react-table": "^8.20.5",
"@tremor/react": "^3.18.3",
+ "@upstash/qstash": "^2.7.16",
"@upstash/redis": "^1.34.3",
"@vercel/analytics": "^1.3.2",
"@vercel/speed-insights": "^1.0.14",
diff --git a/apps/web/prisma/migrations/20241119163400_categorize_date_range/migration.sql b/apps/web/prisma/migrations/20241119163400_categorize_date_range/migration.sql
new file mode 100644
index 0000000000..3e79e2ec29
--- /dev/null
+++ b/apps/web/prisma/migrations/20241119163400_categorize_date_range/migration.sql
@@ -0,0 +1,10 @@
+/*
+ Warnings:
+
+ - You are about to drop the column `categorizeEmails` on the `User` table. All the data in the column will be lost.
+
+*/
+-- AlterTable
+ALTER TABLE "User" DROP COLUMN "categorizeEmails",
+ADD COLUMN "newestCategorizedEmailTime" TIMESTAMP(3),
+ADD COLUMN "oldestCategorizedEmailTime" TIMESTAMP(3);
diff --git a/apps/web/prisma/migrations/20241125052523_remove_categorized_time/migration.sql b/apps/web/prisma/migrations/20241125052523_remove_categorized_time/migration.sql
new file mode 100644
index 0000000000..2bbd7bd7a5
--- /dev/null
+++ b/apps/web/prisma/migrations/20241125052523_remove_categorized_time/migration.sql
@@ -0,0 +1,10 @@
+/*
+ Warnings:
+
+ - You are about to drop the column `newestCategorizedEmailTime` on the `User` table. All the data in the column will be lost.
+ - You are about to drop the column `oldestCategorizedEmailTime` on the `User` table. All the data in the column will be lost.
+
+*/
+-- AlterTable
+ALTER TABLE "User" DROP COLUMN "newestCategorizedEmailTime",
+DROP COLUMN "oldestCategorizedEmailTime";
diff --git a/apps/web/prisma/schema.prisma b/apps/web/prisma/schema.prisma
index b125e0de48..9437a98a3f 100644
--- a/apps/web/prisma/schema.prisma
+++ b/apps/web/prisma/schema.prisma
@@ -70,11 +70,12 @@ model User {
statsEmailFrequency Frequency @default(WEEKLY)
summaryEmailFrequency Frequency @default(WEEKLY)
lastSummaryEmailAt DateTime?
- categorizeEmails Boolean @default(true)
coldEmailBlocker ColdEmailSetting?
coldEmailPrompt String?
rulesPrompt String?
- autoCategorizeSenders Boolean @default(false)
+
+ // categorization
+ autoCategorizeSenders Boolean @default(false)
// premium can be shared among multiple users
premiumId String?
diff --git a/apps/web/utils/actions/ai-rule.ts b/apps/web/utils/actions/ai-rule.ts
index b855732299..93eba6cacb 100644
--- a/apps/web/utils/actions/ai-rule.ts
+++ b/apps/web/utils/actions/ai-rule.ts
@@ -42,9 +42,9 @@ import { withActionInstrumentation } from "@/utils/actions/middleware";
export const runRulesAction = withActionInstrumentation(
"runRules",
async ({ email, force }: { email: EmailForAction; force: boolean }) => {
- const { gmail, user: u, error } = await getSessionAndGmailClient();
- if (error) return { error };
- if (!gmail) return { error: "Could not load Gmail" };
+ const sessionResult = await getSessionAndGmailClient();
+ if (isActionError(sessionResult)) return sessionResult;
+ const { gmail, user: u } = sessionResult;
const user = await prisma.user.findUnique({
where: { id: u.id },
@@ -101,9 +101,9 @@ export const runRulesAction = withActionInstrumentation(
export const testAiAction = withActionInstrumentation(
"testAi",
async ({ messageId, threadId }: { messageId: string; threadId: string }) => {
- const { gmail, user: u, error } = await getSessionAndGmailClient();
- if (error) return { error };
- if (!gmail) return { error: "Could not load Gmail" };
+ const sessionResult = await getSessionAndGmailClient();
+ if (isActionError(sessionResult)) return sessionResult;
+ const { gmail, user: u } = sessionResult;
const user = await prisma.user.findUnique({
where: { id: u.id },
diff --git a/apps/web/utils/actions/categorize-email.ts b/apps/web/utils/actions/categorize-email.ts
new file mode 100644
index 0000000000..a525cac35f
--- /dev/null
+++ b/apps/web/utils/actions/categorize-email.ts
@@ -0,0 +1,56 @@
+"use server";
+
+import { categorize } from "@/app/api/ai/categorize/controller";
+import {
+ type CategorizeBodyWithHtml,
+ categorizeBodyWithHtml,
+} from "@/app/api/ai/categorize/validation";
+import { getSessionAndGmailClient } from "@/utils/actions/helpers";
+import { hasPreviousEmailsFromSender } from "@/utils/gmail/message";
+import { emailToContent } from "@/utils/mail";
+import { findUnsubscribeLink } from "@/utils/parse/parseHtml.server";
+import { truncate } from "@/utils/string";
+import { withActionInstrumentation } from "@/utils/actions/middleware";
+import { validateUserAndAiAccess } from "@/utils/user/validate";
+import { isActionError } from "@/utils/error";
+
+export const categorizeEmailAction = withActionInstrumentation(
+ "categorizeEmail",
+ async (unsafeData: CategorizeBodyWithHtml) => {
+ const sessionResult = await getSessionAndGmailClient();
+ if (isActionError(sessionResult)) return sessionResult;
+ const { gmail, user: u } = sessionResult;
+
+ const userResult = await validateUserAndAiAccess(u.id);
+ if (isActionError(userResult)) return userResult;
+ const { user } = userResult;
+
+ const {
+ success,
+ data,
+ error: parseError,
+ } = categorizeBodyWithHtml.safeParse(unsafeData);
+ if (!success) return { error: parseError.message };
+
+ const content = emailToContent(data);
+
+ const unsubscribeLink = findUnsubscribeLink(data.textHtml);
+ const hasPreviousEmail = await hasPreviousEmailsFromSender(gmail, data);
+
+ const res = await categorize(
+ {
+ ...data,
+ content,
+ snippet: data.snippet || truncate(content, 300),
+ aiApiKey: user.aiApiKey,
+ aiProvider: user.aiProvider,
+ aiModel: user.aiModel,
+ unsubscribeLink,
+ hasPreviousEmail,
+ },
+ { email: u.email },
+ );
+
+ return { category: res?.category };
+ },
+);
diff --git a/apps/web/utils/actions/categorize.ts b/apps/web/utils/actions/categorize.ts
index 8465b49af2..a191703298 100644
--- a/apps/web/utils/actions/categorize.ts
+++ b/apps/web/utils/actions/categorize.ts
@@ -1,373 +1,114 @@
"use server";
import { revalidatePath } from "next/cache";
-import uniq from "lodash/uniq";
-import type { gmail_v1 } from "@googleapis/gmail";
-import { categorize } from "@/app/api/ai/categorize/controller";
-import {
- type CategorizeBodyWithHtml,
- categorizeBodyWithHtml,
-} from "@/app/api/ai/categorize/validation";
+import { getSenders } from "@inboxzero/tinybird";
import {
type CreateCategoryBody,
createCategoryBody,
} from "@/utils/actions/validation";
import { getSessionAndGmailClient } from "@/utils/actions/helpers";
-import { hasPreviousEmailsFromSender } from "@/utils/gmail/message";
-import { emailToContent } from "@/utils/mail";
-import { findUnsubscribeLink } from "@/utils/parse/parseHtml.server";
-import { truncate } from "@/utils/string";
import prisma, { isDuplicateError } from "@/utils/prisma";
import { withActionInstrumentation } from "@/utils/actions/middleware";
-import {
- aiCategorizeSenders,
- REQUEST_MORE_INFORMATION_CATEGORY,
-} from "@/utils/ai/categorize-sender/ai-categorize-senders";
-import { findSenders } from "@/app/api/user/categorize/senders/find-senders";
-import { defaultCategory, type SenderCategory } from "@/utils/categories";
-import { isNewsletterSender } from "@/utils/ai/group/find-newsletters";
-import { isReceiptSender } from "@/utils/ai/group/find-receipts";
+import { defaultCategory } from "@/utils/categories";
import { auth } from "@/app/api/auth/[...nextauth]/auth";
-import { aiCategorizeSender } from "@/utils/ai/categorize-sender/ai-categorize-single-sender";
-import { getThreadsFromSender } from "@/utils/gmail/thread";
-import { isDefined } from "@/utils/types";
-import type { Category, User } from "@prisma/client";
-import type { UserAIFields } from "@/utils/llms/types";
-import { getUserCategories } from "@/utils/category.server";
-import { hasAiAccess } from "@/utils/premium";
-
-export const categorizeEmailAction = withActionInstrumentation(
- "categorizeEmail",
- async (unsafeData: CategorizeBodyWithHtml) => {
- const { gmail, user: u, error } = await getSessionAndGmailClient();
- if (error) return { error };
- if (!gmail) return { error: "Could not load Gmail" };
-
- const {
- success,
- data,
- error: parseError,
- } = categorizeBodyWithHtml.safeParse(unsafeData);
- if (!success) return { error: parseError.message };
-
- const content = emailToContent(data);
-
- const user = await prisma.user.findUnique({
- where: { id: u.id },
- select: {
- aiProvider: true,
- aiModel: true,
- aiApiKey: true,
- },
- });
+import { categorizeSender } from "@/utils/categorize/senders/categorize";
+import { validateUserAndAiAccess } from "@/utils/user/validate";
+import { isActionError } from "@/utils/error";
+import { publishToAiCategorizeSendersQueue } from "@/utils/upstash";
+import { createScopedLogger } from "@/utils/logger";
+import { saveCategorizationTotalItems } from "@/utils/redis/categorization-progress";
- if (!user) return { error: "User not found" };
-
- const unsubscribeLink = findUnsubscribeLink(data.textHtml);
- const hasPreviousEmail = await hasPreviousEmailsFromSender(gmail, data);
-
- const res = await categorize(
- {
- ...data,
- content,
- snippet: data.snippet || truncate(content, 300),
- aiApiKey: user.aiApiKey,
- aiProvider: user.aiProvider,
- aiModel: user.aiModel,
- unsubscribeLink,
- hasPreviousEmail,
- },
- { email: u.email! },
- );
-
- return { category: res?.category };
- },
-);
+const logger = createScopedLogger("actions/categorize");
-async function saveResult(
- result: { sender: string; category?: string },
- categories: { id: string; name: string }[],
- userId: string,
-) {
- if (!result.category) return;
- const { newCategory } = await updateSenderCategory({
- sender: result.sender,
- categories,
- categoryName: result.category,
- userId,
- });
- if (newCategory) categories.push(newCategory);
-}
-
-export const categorizeSendersAction = withActionInstrumentation(
- "categorizeSenders",
+export const bulkCategorizeSendersAction = withActionInstrumentation(
+ "bulkCategorizeSenders",
async () => {
- console.log("categorizeSendersAction");
-
- const { gmail, user: u, error, session } = await getSessionAndGmailClient();
- if (error) return { error };
- if (!gmail) return { error: "Could not load Gmail" };
- if (!session?.accessToken) return { error: "No access token" };
-
- const user = await prisma.user.findUnique({
- where: { id: u.id },
- select: {
- email: true,
- aiProvider: true,
- aiModel: true,
- aiApiKey: true,
- premium: { select: { aiAutomationAccess: true } },
- },
- });
-
- if (!user) return { error: "User not found" };
+ const sessionResult = await getSessionAndGmailClient();
+ if (isActionError(sessionResult)) return sessionResult;
+ const { user } = sessionResult;
- const userHasAiAccess = hasAiAccess(
- user.premium?.aiAutomationAccess,
- user.aiApiKey,
- );
+ const userResult = await validateUserAndAiAccess(user.id);
+ if (isActionError(userResult)) return userResult;
- if (!userHasAiAccess) return { error: "Please upgrade for AI access" };
+ const LIMIT = 100;
- // TODO: fetch from gmail, run ai, then fetch from gmail,...
- // we can run ai and gmail fetch in parallel
-
- const sendersResult = await findSenders(
- gmail,
- session.accessToken,
- undefined,
- 100,
- );
- // const sendersResult = await findSendersWithPagination(gmail, 5);
-
- // console.log("sendersResult", Array.from(sendersResult.senders.keys()));
-
- console.log(`Found ${sendersResult.senders.size} senders`);
-
- const senders = uniq(Array.from(sendersResult.senders.keys()));
-
- console.log(`Found ${senders.length} unique senders`);
-
- // remove senders we've already categorized
- const [existingSenders, categories] = await Promise.all([
- prisma.newsletter.findMany({
- where: { email: { in: senders }, userId: u.id },
- select: {
- email: true,
- category: { select: { name: true, description: true } },
+ async function getUncategorizedSenders(offset: number) {
+ const result = await getSenders({
+ ownerEmail: user.email,
+ limit: LIMIT,
+ offset,
+ });
+ const allSenders = result.data.map((sender) => sender.from);
+ const existingSenders = await prisma.newsletter.findMany({
+ where: {
+ email: { in: allSenders },
+ userId: user.id,
+ category: { isNot: null },
},
- }),
- getUserCategories(u.id),
- ]);
-
- if (categories.length === 0) return { error: "No categories found" };
-
- const sendersToCategorize = senders.filter(
- (sender) => !existingSenders.some((s) => s.email === sender),
- );
-
- const categorizedSenders =
- preCategorizeSendersWithStaticRules(sendersToCategorize);
-
- const sendersToCategorizeWithAi = categorizedSenders
- .filter((sender) => !sender.category)
- .map((sender) => sender.sender);
-
- console.log(
- `Found ${sendersToCategorizeWithAi.length} senders to categorize with AI`,
- );
-
- const aiResults = await aiCategorizeSenders({
- user,
- senders: sendersToCategorizeWithAi.map((sender) => ({
- emailAddress: sender,
- snippets:
- sendersResult.senders.get(sender)?.map((m) => m.snippet) || [],
- })),
- categories,
- });
-
- const results = [...categorizedSenders, ...aiResults];
-
- for (const result of results) {
- await saveResult(result, categories, u.id);
- }
-
- // categorize unknown senders
- // TODO: this will take a while. so probably break this out, or stream results as they come in
- const unknownSenders = [
- ...results,
- ...existingSenders.map((s) => ({
- sender: s.email,
- category: s.category?.name,
- })),
- ].filter(
- (r) =>
- !r.category ||
- r.category === defaultCategory.UNKNOWN.name ||
- r.category === REQUEST_MORE_INFORMATION_CATEGORY,
- );
-
- console.log(
- `Found ${unknownSenders.length} unknown senders to categorize with AI`,
- );
-
- for (const sender of unknownSenders) {
- const messages = sendersResult.senders.get(sender.sender);
-
- let previousEmails =
- messages?.map((m) => m.snippet).filter(isDefined) || [];
-
- if (previousEmails.length === 0) {
- previousEmails = await getPreviousEmails(gmail, sender.sender);
- }
-
- const aiResult = await aiCategorizeSender({
- user,
- sender: sender.sender,
- previousEmails,
- categories,
+ select: { email: true },
});
+ const existingSenderEmails = new Set(existingSenders.map((s) => s.email));
+ const uncategorizedSenders = allSenders.filter(
+ (email) => !existingSenderEmails.has(email),
+ );
- if (aiResult) {
- await saveResult(
- {
- sender: sender.sender,
- category: aiResult.category,
- },
- categories,
- u.id,
- );
- }
+ return uncategorizedSenders;
}
- revalidatePath("/smart-categories");
- },
-);
-
-export const fastCategorizeSendersAction = withActionInstrumentation(
- "fastCategorizeSenders",
- async (senderAddresses: string[]) => {
- console.log("fastCategorizeSendersAction");
-
- const { gmail, user: u, error, session } = await getSessionAndGmailClient();
- if (error) return { error };
- if (!gmail) return { error: "Could not load Gmail" };
- if (!session?.accessToken) return { error: "No access token" };
-
- const user = await prisma.user.findUnique({
- where: { id: u.id },
- select: {
- email: true,
- aiProvider: true,
- aiModel: true,
- aiApiKey: true,
- premium: { select: { aiAutomationAccess: true } },
- },
- });
-
- if (!user) return { error: "User not found" };
-
- // check if user has AI access
- const userHasAiAccess = hasAiAccess(
- user.premium?.aiAutomationAccess,
- user.aiApiKey,
- );
-
- if (!userHasAiAccess) return { error: "Please upgrade for AI access" };
-
- const senders = uniq(senderAddresses);
-
- console.log(`Found ${senders.length} unique senders`);
-
- const categories = await getUserCategories(u.id);
+ let totalUncategorizedSenders = 0;
+ let uncategorizedSenders: string[] = [];
+ for (let i = 0; i < 20; i++) {
+ const newUncategorizedSenders = await getUncategorizedSenders(i * LIMIT);
- if (categories.length === 0) return { error: "No categories found" };
+ logger.trace("Got uncategorized senders", {
+ userId: user.id,
+ uncategorizedSenders: newUncategorizedSenders.length,
+ });
- // pre-categorize senders with static rules
- const categorizedSenders = preCategorizeSendersWithStaticRules(senders);
+ if (newUncategorizedSenders.length === 0) continue;
+ uncategorizedSenders.push(...newUncategorizedSenders);
+ totalUncategorizedSenders += newUncategorizedSenders.length;
- const sendersToCategorizeWithAi = categorizedSenders
- .filter((sender) => !sender.category)
- .map((sender) => sender.sender);
+ await saveCategorizationTotalItems({
+ userId: user.id,
+ totalItems: totalUncategorizedSenders,
+ });
- console.log(
- `Found ${sendersToCategorizeWithAi.length} senders to categorize with AI`,
- );
+ logger.trace("Publishing to queue", {
+ userId: user.id,
+ uncategorizedSenders: uncategorizedSenders.length,
+ });
- // fetch snippets for each sender
- const sendersWithSnippets: Map = new Map();
+ // publish to qstash
+ await publishToAiCategorizeSendersQueue({
+ userId: user.id,
+ senders: uncategorizedSenders,
+ });
- for (const sender of sendersToCategorizeWithAi) {
- const previousEmails = await getPreviousEmails(gmail, sender);
- sendersWithSnippets.set(sender, previousEmails);
+ uncategorizedSenders = [];
}
- // categorize senders with AI
- const aiResults = await aiCategorizeSenders({
- user,
- senders: sendersToCategorizeWithAi.map((sender) => ({
- emailAddress: sender,
- snippets: sendersWithSnippets.get(sender) || [],
- })),
- categories,
+ logger.info("Queued senders for categorization", {
+ userId: user.id,
+ totalUncategorizedSenders,
});
- const results: Record = {};
-
- for (const result of [...categorizedSenders, ...aiResults]) {
- results[result.sender] = result.category;
- }
-
- for (const [sender, category] of Object.entries(results)) {
- await saveResult({ sender, category }, categories, u.id);
- }
-
- revalidatePath("/smart-categories");
-
- console.log("results", JSON.stringify(results, null, 2));
-
- return { results };
+ return { totalUncategorizedSenders };
},
);
export const categorizeSenderAction = withActionInstrumentation(
"categorizeSender",
async (senderAddress: string) => {
- console.log("categorizeSenderAction");
-
- const { gmail, user: u, error, session } = await getSessionAndGmailClient();
- if (error) return { error };
- if (!gmail) return { error: "Could not load Gmail" };
- if (!session?.accessToken) return { error: "No access token" };
-
- const user = await prisma.user.findUnique({
- where: { id: u.id },
- select: {
- id: true,
- email: true,
- aiProvider: true,
- aiModel: true,
- aiApiKey: true,
- premium: { select: { aiAutomationAccess: true } },
- },
- });
-
- if (!user) return { error: "User not found" };
+ const sessionResult = await getSessionAndGmailClient();
+ if (isActionError(sessionResult)) return sessionResult;
+ const { gmail, user: u } = sessionResult;
- const userHasAiAccess = hasAiAccess(
- user.premium?.aiAutomationAccess,
- user.aiApiKey,
- );
+ const userResult = await validateUserAndAiAccess(u.id);
+ if (isActionError(userResult)) return userResult;
+ const { user } = userResult;
- if (!userHasAiAccess) return { error: "Please upgrade for AI access" };
-
- const result = await categorizeSender(
- senderAddress,
- user,
- gmail,
- session.accessToken!,
- );
+ const result = await categorizeSender(senderAddress, user, gmail);
revalidatePath("/smart-categories");
@@ -375,123 +116,6 @@ export const categorizeSenderAction = withActionInstrumentation(
},
);
-export async function categorizeSender(
- senderAddress: string,
- user: Pick & UserAIFields,
- gmail: gmail_v1.Gmail,
- accessToken: string,
-) {
- const categories = await getUserCategories(user.id);
-
- if (categories.length === 0) return { categoryId: undefined };
-
- const previousEmails = await getPreviousEmails(gmail, senderAddress);
-
- const aiResult = await aiCategorizeSender({
- user,
- sender: senderAddress,
- previousEmails,
- categories,
- });
-
- if (aiResult) {
- const { newsletter } = await updateSenderCategory({
- sender: senderAddress,
- categories,
- categoryName: aiResult.category,
- userId: user.id,
- });
-
- return { categoryId: newsletter.categoryId };
- }
- console.error(`No AI result for sender: ${senderAddress}`);
-
- return { categoryId: undefined };
-}
-
-async function getPreviousEmails(gmail: gmail_v1.Gmail, sender: string) {
- const threadsFromSender = await getThreadsFromSender(gmail, sender, 3);
-
- const previousEmails = threadsFromSender
- .map((t) => t?.snippet)
- .filter(isDefined);
-
- return previousEmails;
-}
-
-async function updateSenderCategory({
- userId,
- sender,
- categories,
- categoryName,
-}: {
- userId: string;
- sender: string;
- categories: { id: string; name: string }[];
- categoryName: string;
-}) {
- let category = categories.find((c) => c.name === categoryName);
- let newCategory: Category | undefined;
-
- if (!category) {
- // create category
- newCategory = await prisma.category.create({
- data: {
- name: categoryName,
- userId,
- // color: getRandomColor(),
- },
- });
- category = newCategory;
- }
-
- // save category
- const newsletter = await prisma.newsletter.upsert({
- where: { email_userId: { email: sender, userId } },
- update: { categoryId: category.id },
- create: {
- email: sender,
- userId,
- categoryId: category.id,
- },
- });
-
- return {
- newCategory,
- newsletter,
- };
-}
-
-// TODO: what if user doesn't have all these categories set up?
-// Use static rules to categorize senders if we can, before sending to LLM
-function preCategorizeSendersWithStaticRules(
- senders: string[],
-): { sender: string; category: SenderCategory | undefined }[] {
- return senders.map((sender) => {
- // if the sender is @gmail.com, @yahoo.com, etc.
- // then mark as "Unknown" (LLM will categorize these as "Personal")
- const personalEmailDomains = [
- "gmail.com",
- "googlemail.com",
- "yahoo.com",
- "hotmail.com",
- "outlook.com",
- "aol.com",
- ];
-
- if (personalEmailDomains.some((domain) => sender.includes(`@${domain}>`)))
- return { sender, category: defaultCategory.UNKNOWN.name };
-
- if (isNewsletterSender(sender))
- return { sender, category: defaultCategory.NEWSLETTER.name };
-
- if (isReceiptSender(sender))
- return { sender, category: defaultCategory.RECEIPT.name };
-
- return { sender, category: undefined };
- });
-}
-
export const changeSenderCategoryAction = withActionInstrumentation(
"changeSenderCategory",
async ({ sender, categoryId }: { sender: string; categoryId: string }) => {
diff --git a/apps/web/utils/actions/client.ts b/apps/web/utils/actions/client.ts
index 1c8efd9f26..555f01cbe6 100644
--- a/apps/web/utils/actions/client.ts
+++ b/apps/web/utils/actions/client.ts
@@ -10,8 +10,8 @@ export async function onAutoArchive(from: string, gmailLabelId?: string) {
handleActionResult(result, "Auto archive enabled!");
}
-export async function onDeleteFilter(gmailLabelId: string) {
- const result = await deleteFilterAction(gmailLabelId);
+export async function onDeleteFilter(filterId: string) {
+ const result = await deleteFilterAction(filterId);
handleActionResult(result, "Auto archive disabled!");
}
diff --git a/apps/web/utils/actions/helpers.ts b/apps/web/utils/actions/helpers.ts
index af5a9fcb41..d1a557627d 100644
--- a/apps/web/utils/actions/helpers.ts
+++ b/apps/web/utils/actions/helpers.ts
@@ -1,10 +1,21 @@
import { auth } from "@/app/api/auth/[...nextauth]/auth";
import { getGmailClient } from "@/utils/gmail/client";
+import type { ActionError } from "@/utils/error";
+import type { gmail_v1 } from "@googleapis/gmail";
+import type { Session } from "next-auth";
-export async function getSessionAndGmailClient() {
+export async function getSessionAndGmailClient(): Promise<
+ | ActionError
+ | {
+ gmail: gmail_v1.Gmail;
+ user: { id: string; email: string };
+ session: Session;
+ }
+> {
const session = await auth();
if (!session?.user.email) return { error: "Not logged in" };
const gmail = getGmailClient(session);
+ if (!gmail) return { error: "Failed to get Gmail" };
return {
gmail,
user: { id: session.user.id, email: session.user.email },
diff --git a/apps/web/utils/actions/mail.ts b/apps/web/utils/actions/mail.ts
index f59baab6e1..f38198ef58 100644
--- a/apps/web/utils/actions/mail.ts
+++ b/apps/web/utils/actions/mail.ts
@@ -19,6 +19,7 @@ import {
} from "@/utils/gmail/filter";
import { getSessionAndGmailClient } from "@/utils/actions/helpers";
import { withActionInstrumentation } from "@/utils/actions/middleware";
+import { isActionError } from "@/utils/error";
// do not return functions to the client or we'll get an error
const isStatusOk = (status: number) => status >= 200 && status < 300;
@@ -26,9 +27,9 @@ const isStatusOk = (status: number) => status >= 200 && status < 300;
export const archiveThreadAction = withActionInstrumentation(
"archiveThread",
async (threadId: string, labelId?: string) => {
- const { gmail, user, error } = await getSessionAndGmailClient();
- if (error) return { error };
- if (!gmail) return { error: "Could not load Gmail" };
+ const sessionResult = await getSessionAndGmailClient();
+ if (isActionError(sessionResult)) return sessionResult;
+ const { gmail, user } = sessionResult;
const res = await archiveThread({
gmail,
@@ -45,9 +46,9 @@ export const archiveThreadAction = withActionInstrumentation(
export const trashThreadAction = withActionInstrumentation(
"trashThread",
async (threadId: string) => {
- const { gmail, user, error } = await getSessionAndGmailClient();
- if (error) return { error };
- if (!gmail) return { error: "Could not load Gmail" };
+ const sessionResult = await getSessionAndGmailClient();
+ if (isActionError(sessionResult)) return sessionResult;
+ const { gmail, user } = sessionResult;
const res = await trashThread({
gmail,
@@ -63,9 +64,9 @@ export const trashThreadAction = withActionInstrumentation(
export const trashMessageAction = withActionInstrumentation(
"trashMessage",
async (messageId: string) => {
- const { gmail, error } = await getSessionAndGmailClient();
- if (error) return { error };
- if (!gmail) return { error: "Could not load Gmail" };
+ const sessionResult = await getSessionAndGmailClient();
+ if (isActionError(sessionResult)) return sessionResult;
+ const { gmail } = sessionResult;
const res = await trashMessage({ gmail, messageId });
@@ -76,9 +77,9 @@ export const trashMessageAction = withActionInstrumentation(
export const markReadThreadAction = withActionInstrumentation(
"markReadThread",
async (threadId: string, read: boolean) => {
- const { gmail, error } = await getSessionAndGmailClient();
- if (error) return { error };
- if (!gmail) return { error: "Could not load Gmail" };
+ const sessionResult = await getSessionAndGmailClient();
+ if (isActionError(sessionResult)) return sessionResult;
+ const { gmail } = sessionResult;
const res = await markReadThread({ gmail, threadId, read });
@@ -90,9 +91,9 @@ export const markReadThreadAction = withActionInstrumentation(
export const markImportantMessageAction = withActionInstrumentation(
"markImportantMessage",
async (messageId: string, important: boolean) => {
- const { gmail, error } = await getSessionAndGmailClient();
- if (error) return { error };
- if (!gmail) return { error: "Could not load Gmail" };
+ const sessionResult = await getSessionAndGmailClient();
+ if (isActionError(sessionResult)) return sessionResult;
+ const { gmail } = sessionResult;
const res = await markImportantMessage({ gmail, messageId, important });
@@ -104,9 +105,9 @@ export const markImportantMessageAction = withActionInstrumentation(
export const markSpamThreadAction = withActionInstrumentation(
"markSpamThread",
async (threadId: string) => {
- const { gmail, error } = await getSessionAndGmailClient();
- if (error) return { error };
- if (!gmail) return { error: "Could not load Gmail" };
+ const sessionResult = await getSessionAndGmailClient();
+ if (isActionError(sessionResult)) return sessionResult;
+ const { gmail } = sessionResult;
const res = await markSpam({ gmail, threadId });
@@ -118,9 +119,9 @@ export const markSpamThreadAction = withActionInstrumentation(
export const createAutoArchiveFilterAction = withActionInstrumentation(
"createAutoArchiveFilter",
async (from: string, gmailLabelId?: string) => {
- const { gmail, error } = await getSessionAndGmailClient();
- if (error) return { error };
- if (!gmail) return { error: "Could not load Gmail" };
+ const sessionResult = await getSessionAndGmailClient();
+ if (isActionError(sessionResult)) return sessionResult;
+ const { gmail } = sessionResult;
const res = await createAutoArchiveFilter({ gmail, from, gmailLabelId });
@@ -132,9 +133,9 @@ export const createAutoArchiveFilterAction = withActionInstrumentation(
export const createFilterAction = withActionInstrumentation(
"createFilter",
async (from: string, gmailLabelId: string) => {
- const { gmail, error } = await getSessionAndGmailClient();
- if (error) return { error };
- if (!gmail) return { error: "Could not load Gmail" };
+ const sessionResult = await getSessionAndGmailClient();
+ if (isActionError(sessionResult)) return sessionResult;
+ const { gmail } = sessionResult;
const res = await createFilter({
gmail,
@@ -151,9 +152,9 @@ export const createFilterAction = withActionInstrumentation(
export const deleteFilterAction = withActionInstrumentation(
"deleteFilter",
async (id: string) => {
- const { gmail, error } = await getSessionAndGmailClient();
- if (error) return { error };
- if (!gmail) return { error: "Could not load Gmail" };
+ const sessionResult = await getSessionAndGmailClient();
+ if (isActionError(sessionResult)) return sessionResult;
+ const { gmail } = sessionResult;
const res = await deleteFilter({ gmail, id });
diff --git a/apps/web/utils/actions/middleware.ts b/apps/web/utils/actions/middleware.ts
index 43667b412d..f089a3fe61 100644
--- a/apps/web/utils/actions/middleware.ts
+++ b/apps/web/utils/actions/middleware.ts
@@ -43,6 +43,7 @@ export function withActionInstrumentation<
},
async () => {
try {
+ console.log(`Action: ${name}`);
const res = await action(...args);
if (!res) {
diff --git a/apps/web/utils/ai/actions.ts b/apps/web/utils/ai/actions.ts
index 8bf7c5ced9..0f6b7d1359 100644
--- a/apps/web/utils/ai/actions.ts
+++ b/apps/web/utils/ai/actions.ts
@@ -372,7 +372,7 @@ export const runActionFunction = async (
action: ActionItem,
userEmail: string,
) => {
- logger.log("Running action", { actionType: action.type, userEmail });
+ logger.info("Running action", { actionType: action.type, userEmail });
logger.trace("Running action:", action);
const { type, ...args } = action;
diff --git a/apps/web/utils/ai/categorize-sender/ai-categorize-senders.ts b/apps/web/utils/ai/categorize-sender/ai-categorize-senders.ts
index fa1f6cd043..bc195f87bf 100644
--- a/apps/web/utils/ai/categorize-sender/ai-categorize-senders.ts
+++ b/apps/web/utils/ai/categorize-sender/ai-categorize-senders.ts
@@ -1,11 +1,12 @@
import { z } from "zod";
import { chatCompletionObject } from "@/utils/llms";
import { isDefined } from "@/utils/types";
-import type { UserAIFields } from "@/utils/llms/types";
-import type { Category, User } from "@prisma/client";
+import type { UserEmailWithAI } from "@/utils/llms/types";
+import type { Category } from "@prisma/client";
import { formatCategoriesForPrompt } from "@/utils/ai/categorize-sender/format-categories";
export const REQUEST_MORE_INFORMATION_CATEGORY = "RequestMoreInformation";
+export const UNKNOWN_CATEGORY = "Unknown";
const categorizeSendersSchema = z.object({
senders: z.array(
@@ -22,7 +23,7 @@ export async function aiCategorizeSenders({
senders,
categories,
}: {
- user: Pick & UserAIFields;
+ user: UserEmailWithAI;
senders: { emailAddress: string; snippets: string[] }[];
categories: Pick[];
}): Promise<
diff --git a/apps/web/utils/ai/categorize-sender/ai-categorize-single-sender.ts b/apps/web/utils/ai/categorize-sender/ai-categorize-single-sender.ts
index 189c39628b..b4be7bb11a 100644
--- a/apps/web/utils/ai/categorize-sender/ai-categorize-single-sender.ts
+++ b/apps/web/utils/ai/categorize-sender/ai-categorize-single-sender.ts
@@ -1,7 +1,7 @@
import { z } from "zod";
import { chatCompletionObject } from "@/utils/llms";
-import type { UserAIFields } from "@/utils/llms/types";
-import type { Category, User } from "@prisma/client";
+import type { UserEmailWithAI } from "@/utils/llms/types";
+import type { Category } from "@prisma/client";
import { formatCategoriesForPrompt } from "@/utils/ai/categorize-sender/format-categories";
const categorizeSenderSchema = z.object({
@@ -18,7 +18,7 @@ export async function aiCategorizeSender({
previousEmails,
categories,
}: {
- user: Pick & UserAIFields;
+ user: UserEmailWithAI;
sender: string;
previousEmails: string[];
categories: Pick[];
diff --git a/apps/web/utils/ai/choose-rule/ai-choose-args.ts b/apps/web/utils/ai/choose-rule/ai-choose-args.ts
index e65e8f4bb2..5956fb68f5 100644
--- a/apps/web/utils/ai/choose-rule/ai-choose-args.ts
+++ b/apps/web/utils/ai/choose-rule/ai-choose-args.ts
@@ -76,12 +76,12 @@ async function getArgsAiResponse({
selectedRule: RuleWithActions;
actionsRequiringAi: ActionRequiringAi[];
}): Promise {
- logger.log(
+ logger.info(
`Generating args for rule ${selectedRule.name} (${selectedRule.id})`,
);
if (!actionsRequiringAi.length) {
- logger.log(
+ logger.info(
`Skipping. No parameters for rule ${selectedRule.name} (${selectedRule.id})`,
);
return;
@@ -120,7 +120,7 @@ ${stringifyEmail(email, 3000)}
),
);
- logger.log("Calling chat completion tools");
+ logger.info("Calling chat completion tools");
logger.trace(`System: ${system}`);
logger.trace(`Prompt: ${prompt}`);
// logger.trace("Zod parameters:", zodToJsonSchema(zodParameters));
diff --git a/apps/web/utils/ai/choose-rule/execute.ts b/apps/web/utils/ai/choose-rule/execute.ts
index 38f8ee2217..5ddc748610 100644
--- a/apps/web/utils/ai/choose-rule/execute.ts
+++ b/apps/web/utils/ai/choose-rule/execute.ts
@@ -23,7 +23,7 @@ export async function executeAct({
email: EmailForAction;
userEmail: string;
}) {
- logger.log(
+ logger.info(
`Executing rule: ${executedRule.id} for rule ${executedRule.ruleId}`,
);
@@ -48,7 +48,7 @@ export async function executeAct({
});
if (pendingRules.count === 0) {
- logger.log(
+ logger.info(
`Executed rule ${executedRule.id} is not pending or does not exist`,
);
return;
diff --git a/apps/web/utils/ai/choose-rule/run-rules.ts b/apps/web/utils/ai/choose-rule/run-rules.ts
index 3cbc432309..deab661398 100644
--- a/apps/web/utils/ai/choose-rule/run-rules.ts
+++ b/apps/web/utils/ai/choose-rule/run-rules.ts
@@ -106,7 +106,7 @@ export async function runRulesOnMessage({
if (aiResponse.handled) return { handled: true };
- logger.log(
+ logger.info(
`No rules matched. ${user.email} ${message.threadId} ${message.id}`,
);
logger.trace(aiResponse);
diff --git a/apps/web/utils/categorize/senders/categorize.ts b/apps/web/utils/categorize/senders/categorize.ts
new file mode 100644
index 0000000000..4e642cc3ae
--- /dev/null
+++ b/apps/web/utils/categorize/senders/categorize.ts
@@ -0,0 +1,172 @@
+import type { gmail_v1 } from "@googleapis/gmail";
+import prisma from "@/utils/prisma";
+import { aiCategorizeSenders } from "@/utils/ai/categorize-sender/ai-categorize-senders";
+import { defaultCategory, type SenderCategory } from "@/utils/categories";
+import { isNewsletterSender } from "@/utils/ai/group/find-newsletters";
+import { isReceiptSender } from "@/utils/ai/group/find-receipts";
+import { aiCategorizeSender } from "@/utils/ai/categorize-sender/ai-categorize-single-sender";
+import { getThreadsFromSender } from "@/utils/gmail/thread";
+import { isDefined } from "@/utils/types";
+import type { Category } from "@prisma/client";
+import { getUserCategories } from "@/utils/category.server";
+import type { User } from "@prisma/client";
+import type { UserAIFields, UserEmailWithAI } from "@/utils/llms/types";
+import { createScopedLogger } from "@/utils/logger";
+
+const logger = createScopedLogger("categorize/senders");
+
+export async function categorizeSender(
+ senderAddress: string,
+ user: Pick & UserAIFields,
+ gmail: gmail_v1.Gmail,
+ userCategories?: Pick[],
+) {
+ const categories = userCategories || (await getUserCategories(user.id));
+ if (categories.length === 0) return { categoryId: undefined };
+
+ const previousEmails = await getPreviousEmails(gmail, senderAddress);
+
+ const aiResult = await aiCategorizeSender({
+ user,
+ sender: senderAddress,
+ previousEmails,
+ categories,
+ });
+
+ if (aiResult) {
+ const { newsletter } = await updateSenderCategory({
+ sender: senderAddress,
+ categories,
+ categoryName: aiResult.category,
+ userId: user.id,
+ });
+
+ return { categoryId: newsletter.categoryId };
+ }
+
+ logger.error(`No AI result for sender: ${senderAddress}`);
+
+ return { categoryId: undefined };
+}
+
+async function getPreviousEmails(gmail: gmail_v1.Gmail, sender: string) {
+ const threadsFromSender = await getThreadsFromSender(gmail, sender, 3);
+
+ const previousEmails = threadsFromSender
+ .map((t) => t?.snippet)
+ .filter(isDefined);
+
+ return previousEmails;
+}
+
+export async function updateSenderCategory({
+ userId,
+ sender,
+ categories,
+ categoryName,
+}: {
+ userId: string;
+ sender: string;
+ categories: Pick[];
+ categoryName: string;
+}) {
+ let category = categories.find((c) => c.name === categoryName);
+ let newCategory: Category | undefined;
+
+ if (!category) {
+ // create category
+ newCategory = await prisma.category.create({
+ data: {
+ name: categoryName,
+ userId,
+ // color: getRandomColor(),
+ },
+ });
+ category = newCategory;
+ }
+
+ // save category
+ const newsletter = await prisma.newsletter.upsert({
+ where: { email_userId: { email: sender, userId } },
+ update: { categoryId: category.id },
+ create: {
+ email: sender,
+ userId,
+ categoryId: category.id,
+ },
+ });
+
+ return {
+ newCategory,
+ newsletter,
+ };
+}
+
+// TODO: what if user doesn't have all these categories set up?
+// Use static rules to categorize senders if we can, before sending to LLM
+function preCategorizeSendersWithStaticRules(
+ senders: string[],
+): { sender: string; category: SenderCategory | undefined }[] {
+ return senders.map((sender) => {
+ // if the sender is @gmail.com, @yahoo.com, etc.
+ // then mark as "Unknown" (LLM will categorize these as "Personal")
+ const personalEmailDomains = [
+ "gmail.com",
+ "googlemail.com",
+ "yahoo.com",
+ "hotmail.com",
+ "outlook.com",
+ "aol.com",
+ ];
+
+ if (personalEmailDomains.some((domain) => sender.includes(`@${domain}>`)))
+ return { sender, category: defaultCategory.UNKNOWN.name };
+
+ if (isNewsletterSender(sender))
+ return { sender, category: defaultCategory.NEWSLETTER.name };
+
+ if (isReceiptSender(sender))
+ return { sender, category: defaultCategory.RECEIPT.name };
+
+ return { sender, category: undefined };
+ });
+}
+
+export async function getCategories(userId: string) {
+ const categories = await getUserCategories(userId);
+ if (categories.length === 0) return { error: "No categories found" };
+ return { categories };
+}
+
+export async function categorizeWithAi({
+ user,
+ sendersWithSnippets,
+ categories,
+}: {
+ user: UserEmailWithAI;
+ sendersWithSnippets: Map;
+ categories: Pick[];
+}) {
+ const categorizedSenders = preCategorizeSendersWithStaticRules(
+ Array.from(sendersWithSnippets.keys()),
+ );
+
+ const sendersToCategorizeWithAi = categorizedSenders
+ .filter((sender) => !sender.category)
+ .map((sender) => sender.sender);
+
+ logger.info("Found senders to categorize with AI", {
+ count: sendersToCategorizeWithAi.length,
+ });
+
+ const aiResults = await aiCategorizeSenders({
+ user,
+ senders: sendersToCategorizeWithAi.map((sender) => ({
+ emailAddress: sender,
+ snippets: sendersWithSnippets.get(sender) || [],
+ })),
+ categories,
+ });
+
+ return [...categorizedSenders, ...aiResults];
+}
diff --git a/apps/web/utils/config.ts b/apps/web/utils/config.ts
index fa775a5e21..3d00ccfb99 100644
--- a/apps/web/utils/config.ts
+++ b/apps/web/utils/config.ts
@@ -1,11 +1,6 @@
-import { env } from "@/env";
-
export const AI_GENERATED_FIELD_VALUE = "___AI_GENERATE___";
-export const aiHomePath = "/automation";
-export const appHomePath = env.NEXT_PUBLIC_DISABLE_TINYBIRD
- ? aiHomePath
- : "/bulk-unsubscribe";
+export const appHomePath = "/automation";
export const GroupName = {
NEWSLETTER: "Newsletters",
diff --git a/apps/web/utils/date.ts b/apps/web/utils/date.ts
index 65e9fa16ad..5f98cbe1ab 100644
--- a/apps/web/utils/date.ts
+++ b/apps/web/utils/date.ts
@@ -34,7 +34,8 @@ export function dateToSeconds(date: Date) {
return Math.floor(date.getTime() / 1000);
}
-export const ONE_HOUR_MS = 1000 * 60 * 60;
+export const ONE_MINUTE_MS = 1000 * 60;
+export const ONE_HOUR_MS = ONE_MINUTE_MS * 60;
export const ONE_DAY_MS = ONE_HOUR_MS * 24;
export const ONE_MONTH_MS = ONE_DAY_MS * 30;
export const ONE_YEAR_MS = ONE_DAY_MS * 365;
diff --git a/apps/web/utils/gmail/client.ts b/apps/web/utils/gmail/client.ts
index 9ca23e2ceb..2b4ffa46ee 100644
--- a/apps/web/utils/gmail/client.ts
+++ b/apps/web/utils/gmail/client.ts
@@ -53,11 +53,12 @@ export const getGmailClientWithRefresh = async (
// may throw `invalid_grant` error
try {
const tokens = await auth.refreshAccessToken();
+ const newAccessToken = tokens.credentials.access_token;
- if (tokens.credentials.access_token !== session.accessToken) {
+ if (newAccessToken !== session.accessToken) {
await saveRefreshToken(
{
- access_token: tokens.credentials.access_token ?? undefined,
+ access_token: newAccessToken ?? undefined,
expires_at: tokens.credentials.expiry_date
? Math.floor(tokens.credentials.expiry_date / 1000)
: undefined,
diff --git a/apps/web/utils/gmail/message.ts b/apps/web/utils/gmail/message.ts
index 0a9f27ff58..637f46b57f 100644
--- a/apps/web/utils/gmail/message.ts
+++ b/apps/web/utils/gmail/message.ts
@@ -40,6 +40,7 @@ export async function getMessagesBatch(
.map((message) => {
if (isBatchError(message)) {
// TODO need a better way to handle this
+ // https://claude.ai/chat/3984ad45-f5d4-4196-8309-9b5dc9211d05
console.error(
`Error fetching message ${message.error.code} ${message.error.message}`,
);
diff --git a/apps/web/utils/internal-api.ts b/apps/web/utils/internal-api.ts
new file mode 100644
index 0000000000..50a8e0e8e9
--- /dev/null
+++ b/apps/web/utils/internal-api.ts
@@ -0,0 +1,9 @@
+import { env } from "@/env";
+
+export const INTERNAL_API_KEY_HEADER = "x-api-key";
+
+export const isValidInternalApiKey = (headers: Headers): boolean => {
+ if (!env.INTERNAL_API_KEY) return false;
+ const apiKey = headers.get(INTERNAL_API_KEY_HEADER);
+ return apiKey === env.INTERNAL_API_KEY;
+};
diff --git a/apps/web/utils/llms/types.ts b/apps/web/utils/llms/types.ts
index 6e438355d1..3a39240218 100644
--- a/apps/web/utils/llms/types.ts
+++ b/apps/web/utils/llms/types.ts
@@ -1,3 +1,4 @@
import type { User } from "@prisma/client";
export type UserAIFields = Pick;
+export type UserEmailWithAI = Pick & UserAIFields;
diff --git a/apps/web/utils/logger.ts b/apps/web/utils/logger.ts
index 09ecf97a57..a733b774ef 100644
--- a/apps/web/utils/logger.ts
+++ b/apps/web/utils/logger.ts
@@ -1,8 +1,8 @@
-type LogLevel = "log" | "error" | "warn" | "trace";
+type LogLevel = "info" | "error" | "warn" | "trace";
type LogMessage = string | Record;
const colors = {
- log: "\x1b[0m", // white
+ info: "\x1b[0m", // white
error: "\x1b[31m", // red
warn: "\x1b[33m", // yellow
trace: "\x1b[36m", // cyan
@@ -25,8 +25,8 @@ export function createScopedLogger(scope: string) {
};
return {
- log: (message: LogMessage, ...args: unknown[]) =>
- console.log(formatMessage("log", message), ...args),
+ info: (message: LogMessage, ...args: unknown[]) =>
+ console.log(formatMessage("info", message), ...args),
error: (message: LogMessage, ...args: unknown[]) =>
console.error(formatMessage("error", message), ...args),
diff --git a/apps/web/utils/redis/categorization-progress.ts b/apps/web/utils/redis/categorization-progress.ts
new file mode 100644
index 0000000000..3c0e69a393
--- /dev/null
+++ b/apps/web/utils/redis/categorization-progress.ts
@@ -0,0 +1,65 @@
+import { z } from "zod";
+import { redis } from "@/utils/redis";
+
+const categorizationProgressSchema = z.object({
+ totalItems: z.number().int().min(0),
+ completedItems: z.number().int().min(0),
+});
+type RedisCategorizationProgress = z.infer;
+
+function getKey(userId: string) {
+ return `categorization-progress:${userId}`;
+}
+
+export async function getCategorizationProgress({
+ userId,
+}: {
+ userId: string;
+}) {
+ const key = getKey(userId);
+ const progress = await redis.get(key);
+ if (!progress) return null;
+ return progress;
+}
+
+export async function saveCategorizationTotalItems({
+ userId,
+ totalItems,
+}: {
+ userId: string;
+ totalItems: number;
+}) {
+ const key = getKey(userId);
+ const existingProgress = await getCategorizationProgress({ userId });
+ await redis.set(key, { ...existingProgress, totalItems });
+}
+
+export async function saveCategorizationProgress({
+ userId,
+ incrementCompleted,
+}: {
+ userId: string;
+ incrementCompleted: number;
+}) {
+ const existingProgress = await getCategorizationProgress({ userId });
+ if (!existingProgress) return null;
+
+ const key = getKey(userId);
+ const updatedProgress: RedisCategorizationProgress = {
+ ...existingProgress,
+ completedItems: (existingProgress.completedItems || 0) + incrementCompleted,
+ };
+
+ // Store progress for 2 minutes
+ await redis.set(key, updatedProgress, { ex: 2 * 60 });
+ return updatedProgress;
+}
+
+export async function deleteCategorizationProgress({
+ userId,
+}: {
+ userId: string;
+}) {
+ const key = getKey(userId);
+ await redis.del(key);
+}
diff --git a/apps/web/utils/upstash.ts b/apps/web/utils/upstash.ts
new file mode 100644
index 0000000000..10e3f37225
--- /dev/null
+++ b/apps/web/utils/upstash.ts
@@ -0,0 +1,123 @@
+import { Client } from "@upstash/qstash";
+import { env } from "@/env";
+import { INTERNAL_API_KEY_HEADER } from "@/utils/internal-api";
+import { SafeError } from "@/utils/error";
+import { sleep } from "@/utils/sleep";
+import type { AiCategorizeSenders } from "@/app/api/user/categorize/senders/batch/handle-batch-validation";
+import { createScopedLogger } from "@/utils/logger";
+
+const logger = createScopedLogger("upstash");
+
+function getQstashClient() {
+ if (!env.QSTASH_TOKEN) return null;
+ return new Client({ token: env.QSTASH_TOKEN });
+}
+
+export async function publishToQstash(url: string, body: any) {
+ const client = getQstashClient();
+
+ if (client) return await client.publishJSON({ url, body });
+
+ // Fallback to fetch if Qstash client is not found
+ logger.warn("Qstash client not found");
+
+ if (!env.INTERNAL_API_KEY)
+ throw new SafeError("Internal API key must be set");
+
+ // Don't await. Run in background
+ fetch(`${url}/simple`, {
+ method: "POST",
+ headers: {
+ "Content-Type": "application/json",
+ [INTERNAL_API_KEY_HEADER]: env.INTERNAL_API_KEY,
+ },
+ body: JSON.stringify(body),
+ });
+ // Wait for 100ms to ensure the request is sent
+ await sleep(100);
+}
+
+export async function publishToQstashQueue({
+ queueName,
+ parallelism,
+ url,
+ body,
+}: {
+ queueName: string;
+ parallelism: number;
+ url: string;
+ body: any;
+}) {
+ const client = getQstashClient();
+
+ if (client) {
+ const queue = client.queue({ queueName });
+ queue.upsert({ parallelism });
+ return await queue.enqueueJSON({ url, body });
+ }
+
+ // Fallback to fetch if Qstash client is not found
+ logger.warn("Qstash client not found");
+
+ if (!env.INTERNAL_API_KEY)
+ throw new SafeError("Internal API key must be set");
+
+ // Don't await. Run in background
+ fetch(`${url}/simple`, {
+ method: "POST",
+ headers: {
+ "Content-Type": "application/json",
+ [INTERNAL_API_KEY_HEADER]: env.INTERNAL_API_KEY,
+ },
+ body: JSON.stringify(body),
+ });
+ // Wait for 100ms to ensure the request is sent
+ await sleep(100);
+}
+
+/**
+ * Publishes sender categorization tasks to QStash queue in batches
+ * Splits large arrays of senders into chunks of BATCH_SIZE to prevent overwhelming the system
+ */
+export async function publishToAiCategorizeSendersQueue(
+ body: AiCategorizeSenders,
+) {
+ const url = `${env.WEBHOOK_URL || env.NEXT_PUBLIC_BASE_URL}/api/user/categorize/senders/batch`;
+
+ // Split senders into smaller chunks to process in batches
+ const BATCH_SIZE = 50;
+ const chunks = chunkArray(body.senders, BATCH_SIZE);
+
+ logger.trace("Publishing to AI categorize senders queue in chunks", {
+ url,
+ totalSenders: body.senders.length,
+ numberOfChunks: chunks.length,
+ });
+
+ // Process all chunks in parallel, each as a separate queue item
+ await Promise.all(
+ chunks.map((senderChunk) =>
+ publishToQstashQueue({
+ queueName: "ai-categorize-senders",
+ parallelism: 3, // Allow up to 3 concurrent jobs from this queue
+ url,
+ body: {
+ userId: body.userId,
+ senders: senderChunk,
+ },
+ }),
+ ),
+ );
+}
+
+/**
+ * Utility function to split an array into smaller chunks of specified size
+ * @param array The array to split into chunks
+ * @param size Maximum size of each chunk
+ * @returns Array of chunks
+ */
+function chunkArray(array: T[], size: number): T[][] {
+ return Array.from({ length: Math.ceil(array.length / size) }, (_, index) =>
+ array.slice(index * size, (index + 1) * size),
+ );
+}
diff --git a/apps/web/utils/user/validate.ts b/apps/web/utils/user/validate.ts
new file mode 100644
index 0000000000..4dca46c003
--- /dev/null
+++ b/apps/web/utils/user/validate.ts
@@ -0,0 +1,25 @@
+import { hasAiAccess } from "@/utils/premium";
+import prisma from "@/utils/prisma";
+
+export async function validateUserAndAiAccess(userId: string) {
+ const user = await prisma.user.findUnique({
+ where: { id: userId },
+ select: {
+ id: true,
+ email: true,
+ aiProvider: true,
+ aiModel: true,
+ aiApiKey: true,
+ premium: { select: { aiAutomationAccess: true } },
+ },
+ });
+ if (!user) return { error: "User not found" };
+
+ const userHasAiAccess = hasAiAccess(
+ user.premium?.aiAutomationAccess,
+ user.aiApiKey,
+ );
+ if (!userHasAiAccess) return { error: "Please upgrade for AI access" };
+
+ return { user };
+}
diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml
index e4f97189e1..1f4124cc1a 100644
--- a/pnpm-lock.yaml
+++ b/pnpm-lock.yaml
@@ -242,6 +242,9 @@ importers:
'@tremor/react':
specifier: ^3.18.3
version: 3.18.3(react-dom@18.3.1(react@18.3.1))(react@18.3.1)(tailwindcss@3.4.14(ts-node@10.9.2(@swc/core@1.6.5(@swc/helpers@0.5.5))(@types/node@22.8.6)(typescript@5.6.3)))
+ '@upstash/qstash':
+ specifier: ^2.7.16
+ version: 2.7.16
'@upstash/redis':
specifier: ^1.34.3
version: 1.34.3
@@ -5166,6 +5169,9 @@ packages:
'@ungap/structured-clone@1.2.0':
resolution: {integrity: sha512-zuVdFrMJiuCDQUMCzQaD6KL28MjnqqN8XnAqiEq9PNm/hCPTSGfrXCOfwj1ow4LFb/tNymJPwsNbVePc1xFqrQ==}
+ '@upstash/qstash@2.7.16':
+ resolution: {integrity: sha512-1G1FP+P0/HxQsUe2n+Di8hJBSA3ZV6Y0mF5o0rdnxbmXXHEzCqsrSsRCFbllhd8SS6rzwPhtEQssk2ZKppnGMg==}
+
'@upstash/redis@1.34.3':
resolution: {integrity: sha512-VT25TyODGy/8ljl7GADnJoMmtmJ1F8d84UXfGonRRF8fWYJz7+2J6GzW+a6ETGtk4OyuRTt7FRSvFG5GvrfSdQ==}
@@ -8574,6 +8580,10 @@ packages:
resolution: {integrity: sha512-dBpDMdxv9Irdq66304OLfEmQ9tbNRFnFTuZiLo+bD+r332bBmMJ8GBLXklIXXgxd3+v9+KUnZaUR5PJMa75Gsg==}
engines: {node: '>= 0.4.0'}
+ neverthrow@7.2.0:
+ resolution: {integrity: sha512-iGBUfFB7yPczHHtA8dksKTJ9E8TESNTAx1UQWW6TzMF280vo9jdPYpLUXrMN1BCkPdHFdNG3fxOt2CUad8KhAw==}
+ engines: {node: '>=18'}
+
next-auth@5.0.0-beta.22:
resolution: {integrity: sha512-QGBo9HGOjmnJBHGXvtFztl0tM5tL0porDlk74HVoCCzXd986ApOlIW3EmiCuho7YzEopgkFiwwmcXpoCrHAtYw==}
peerDependencies:
@@ -16846,6 +16856,12 @@ snapshots:
'@ungap/structured-clone@1.2.0': {}
+ '@upstash/qstash@2.7.16':
+ dependencies:
+ crypto-js: 4.2.0
+ jose: 5.9.3
+ neverthrow: 7.2.0
+
'@upstash/redis@1.34.3':
dependencies:
crypto-js: 4.2.0
@@ -21165,6 +21181,8 @@ snapshots:
netmask@2.0.2: {}
+ neverthrow@7.2.0: {}
+
next-auth@5.0.0-beta.22(next@14.2.15(@babel/core@7.24.7)(@opentelemetry/api@1.9.0)(react-dom@18.3.1(react@18.3.1))(react@18.3.1))(nodemailer@6.9.16)(react@18.3.1):
dependencies:
'@auth/core': 0.35.3(nodemailer@6.9.16)
diff --git a/turbo.json b/turbo.json
index 319f8c1d64..fb81713b16 100644
--- a/turbo.json
+++ b/turbo.json
@@ -8,8 +8,7 @@
"DATABASE_URL",
"DIRECT_URL",
"NEXTAUTH_SECRET",
- "GOOGLE_CLIENT_ID",
- "GOOGLE_CLIENT_SECRET",
+ "NEXTAUTH_URL",
"GOOGLE_CLIENT_ID",
"GOOGLE_CLIENT_SECRET",
"OPENAI_API_KEY",
@@ -19,6 +18,9 @@
"BEDROCK_REGION",
"UPSTASH_REDIS_URL",
"UPSTASH_REDIS_TOKEN",
+ "QSTASH_TOKEN",
+ "QSTASH_CURRENT_SIGNING_KEY",
+ "QSTASH_NEXT_SIGNING_KEY",
"GOOGLE_PUBSUB_TOPIC_NAME",
"GOOGLE_PUBSUB_VERIFICATION_TOKEN",
"SENTRY_AUTH_TOKEN",
@@ -38,6 +40,8 @@
"CRON_SECRET",
"LOOPS_API_SECRET",
"ADMINS",
+ "WEBHOOK_URL",
+ "INTERNAL_API_KEY",
"NEXT_PUBLIC_LEMON_STORE_ID",