diff --git a/apps/web/app/(app)/[emailAccountId]/assistant/BulkProcessActivityLog.tsx b/apps/web/app/(app)/[emailAccountId]/assistant/BulkProcessActivityLog.tsx new file mode 100644 index 0000000000..808e631167 --- /dev/null +++ b/apps/web/app/(app)/[emailAccountId]/assistant/BulkProcessActivityLog.tsx @@ -0,0 +1,252 @@ +"use client"; + +import { useEffect, useState } from "react"; +import { CheckCircle2Icon, LoaderIcon } from "lucide-react"; +import useSWR from "swr"; +import type { BatchExecutedRulesResponse } from "@/app/api/user/executed-rules/batch/route"; +import type { ThreadsResponse } from "@/app/api/threads/route"; +import { Badge } from "@/components/Badge"; + +export type ActivityLogEntry = { + id: string; + from: string; + subject: string; + status: "processing" | "completed" | "waiting"; + ruleName?: string; +}; + +export function ActivityLog({ + entries, + processingCount = 0, + paused = false, + title = "Processing Activity", + loading = false, +}: { + entries: ActivityLogEntry[]; + processingCount?: number; + paused?: boolean; + title?: string; + loading?: boolean; +}) { + if (entries.length === 0 && !loading) return null; + + return ( +
+
+

{title}

+ {processingCount > 0 && !paused && ( + + {processingCount} processing + + )} +
+
+
+ {entries.length === 0 && loading && ( +
+ + Fetching emails... +
+ )} + {entries.map((entry) => ( + + ))} +
+
+
+ ); +} + +function ActivityLogRow({ + entry, + paused, +}: { + entry: ActivityLogEntry; + paused: boolean; +}) { + const isCompleted = entry.status === "completed"; + const showSpinner = entry.status === "processing" && !paused; + + return ( +
+ {isCompleted ? ( + + ) : showSpinner ? ( + + ) : ( +
+ )} +
+
+ + {entry.from} + + + {entry.ruleName && ( + + {entry.ruleName} + + )} + {!entry.ruleName && isCompleted && ( + No match + )} + +
+
+ {entry.subject} +
+
+
+ ); +} + +// ============================================================================= +// Smart Component - Data fetching and state management +// ============================================================================= + +type InternalActivityLogEntry = { + threadId: string; + messageId: string; + from: string; + subject: string; + status: "processing" | "completed"; + ruleName?: string; + timestamp: number; +}; + +export function BulkProcessActivityLog({ + threads, + processedThreadIds, + aiQueue, + paused, + loading = false, +}: { + threads: ThreadsResponse["threads"]; + processedThreadIds: Set; + aiQueue: Set; + paused: boolean; + loading?: boolean; +}) { + const [activityLog, setActivityLog] = useState( + [], + ); + + // Clear activity log when a new run starts + useEffect(() => { + if (loading) { + setActivityLog([]); + } + }, [loading]); + + // Get message IDs from processed threads + const messageIds = Array.from(processedThreadIds) + .map((threadId) => { + const thread = threads.find((t) => t.id === threadId); + return thread?.messages?.[thread.messages.length - 1]?.id; + }) + .filter((id): id is string => !!id) + .slice(-20); // Keep last 20 + + // Check if all items in activity log are completed + const allCompleted = + activityLog.length > 0 && + activityLog.every((entry) => entry.status === "completed"); + + // Poll for executed rules - keep polling while there are unprocessed messages + const { data: executedRulesData } = useSWR( + messageIds.length > 0 && !allCompleted + ? `/api/user/executed-rules/batch?messageIds=${messageIds.join(",")}` + : null, + { + refreshInterval: messageIds.length > 0 && !allCompleted ? 2000 : 0, + }, + ); + + // Update activity log when threads are queued or rules are executed + useEffect(() => { + if (!threads.length) return; + + setActivityLog((prev) => { + const existingMessageIds = new Set(prev.map((entry) => entry.messageId)); + const newEntries: InternalActivityLogEntry[] = []; + + for (const threadId of processedThreadIds) { + const thread = threads.find((t) => t.id === threadId); + if (!thread) continue; + + const message = thread.messages?.[thread.messages.length - 1]; + if (!message) continue; + + // Check if already in log (using current state, not stale closure) + if (existingMessageIds.has(message.id)) continue; + + const executedRule = executedRulesData?.rulesMap[message.id]?.[0]; + + newEntries.push({ + threadId: thread.id, + messageId: message.id, + from: message.headers.from || "Unknown", + subject: message.headers.subject || "(No subject)", + status: executedRule ? "completed" : "processing", + ruleName: executedRule?.rule?.name, + timestamp: Date.now(), + }); + + // Track newly added to prevent duplicates within this batch + existingMessageIds.add(message.id); + } + + if (newEntries.length === 0) return prev; + return [...newEntries, ...prev].slice(0, 50); // Keep last 50 + }); + }, [processedThreadIds, executedRulesData, threads]); + + // Update existing entries when rules complete + useEffect(() => { + if (!executedRulesData) return; + + setActivityLog((prev) => + prev.map((entry) => { + if (entry.status === "completed") return entry; + + const executedRule = executedRulesData.rulesMap[entry.messageId]?.[0]; + if (executedRule) { + return { + ...entry, + status: "completed" as const, + ruleName: executedRule.rule?.name, + }; + } + return entry; + }), + ); + }, [executedRulesData]); + + // Transform internal entries to dumb component format + const entries: ActivityLogEntry[] = activityLog.map((entry) => { + const isInQueue = aiQueue.has(entry.threadId); + const isCompleted = entry.status === "completed"; + + return { + id: entry.messageId, + from: entry.from, + subject: entry.subject, + status: isCompleted ? "completed" : isInQueue ? "processing" : "waiting", + ruleName: entry.ruleName, + }; + }); + + // Count items currently being processed (in queue, not completed) + const processingCount = activityLog.filter( + (entry) => aiQueue.has(entry.threadId) && entry.status !== "completed", + ).length; + + return ( + + ); +} diff --git a/apps/web/app/(app)/[emailAccountId]/assistant/BulkRunRules.tsx b/apps/web/app/(app)/[emailAccountId]/assistant/BulkRunRules.tsx index de502f0a08..3fd1b50a31 100644 --- a/apps/web/app/(app)/[emailAccountId]/assistant/BulkRunRules.tsx +++ b/apps/web/app/(app)/[emailAccountId]/assistant/BulkRunRules.tsx @@ -1,23 +1,30 @@ "use client"; -import { useRef, useState } from "react"; +import { useReducer, useRef, useState } from "react"; import Link from "next/link"; -import { HistoryIcon } from "lucide-react"; +import { HistoryIcon, PauseIcon, PlayIcon, SquareIcon } from "lucide-react"; import { Button } from "@/components/ui/button"; import { SectionDescription } from "@/components/Typography"; import type { ThreadsResponse } from "@/app/api/threads/route"; import type { ThreadsQuery } from "@/app/api/threads/validation"; import { LoadingContent } from "@/components/LoadingContent"; import { runAiRules } from "@/utils/queue/email-actions"; +import { + pauseAiQueue, + resumeAiQueue, + clearAiQueue, +} from "@/utils/queue/ai-queue"; import { sleep } from "@/utils/sleep"; import { toastError } from "@/components/Toast"; import { PremiumAlertWithData, usePremium } from "@/components/PremiumAlert"; import { SetDateDropdown } from "@/app/(app)/[emailAccountId]/assistant/SetDateDropdown"; import { useThreads } from "@/hooks/useThreads"; -import { useAiQueueState } from "@/store/ai-queue"; +import { useBeforeUnload } from "@/hooks/useBeforeUnload"; +import { useAiQueueState, clearAiQueueAtom } from "@/store/ai-queue"; import { Dialog, DialogContent, + DialogDescription, DialogHeader, DialogTitle, DialogTrigger, @@ -27,14 +34,18 @@ import { fetchWithAccount } from "@/utils/fetch"; import { Toggle } from "@/components/Toggle"; import { hasTierAccess } from "@/utils/premium"; import { usePremiumModal } from "@/app/(app)/premium/PremiumModal"; +import { BulkProcessActivityLog } from "@/app/(app)/[emailAccountId]/assistant/BulkProcessActivityLog"; +import { + bulkRunReducer, + getProgressMessage, + initialBulkRunState, +} from "@/app/(app)/[emailAccountId]/assistant/bulk-run-rules-reducer"; export function BulkRunRules() { const { emailAccountId } = useAccount(); const [isOpen, setIsOpen] = useState(false); - const [processedThreadIds, setProcessedThreadIds] = useState>( - new Set(), - ); + const [state, dispatch] = useReducer(bulkRunReducer, initialBulkRunState); const { data, isLoading, error } = useThreads({ type: "inbox" }); @@ -48,21 +59,82 @@ export function BulkRunRules() { minimumTier: "BUSINESS_PLUS_MONTHLY", }); - const [running, setRunning] = useState(false); - const [startDate, setStartDate] = useState(); const [endDate, setEndDate] = useState(); const [includeRead, setIncludeRead] = useState(false); - const [runResult, setRunResult] = useState<{ - count: number; - } | null>(null); const abortRef = useRef<() => void>(undefined); + // Derived state const remaining = new Set( - [...processedThreadIds].filter((id) => queue.has(id)), + [...state.processedThreadIds].filter((id) => queue.has(id)), ).size; - const completed = processedThreadIds.size - remaining; + const completed = state.processedThreadIds.size - remaining; + const isProcessing = queue.size > 0; + const isPaused = state.status === "paused"; + const isBusy = isProcessing || state.status === "processing"; + + // Warn user before leaving page during processing (includes initial fetch) + useBeforeUnload(isBusy); + + const handleStart = async () => { + dispatch({ type: "START" }); + + if (!startDate) { + toastError({ description: "Please select a start date" }); + dispatch({ type: "RESET" }); + return; + } + if (!emailAccountId) { + toastError({ + description: "Email account ID is missing. Please refresh the page.", + }); + dispatch({ type: "RESET" }); + return; + } + + // Ensure queue is not paused from a previous run + resumeAiQueue(); + + try { + abortRef.current = await onRun( + emailAccountId, + { startDate, endDate, includeRead }, + (threads) => { + dispatch({ type: "THREADS_QUEUED", threads }); + }, + (_completionStatus, count) => { + dispatch({ type: "COMPLETE", count }); + }, + ); + } catch (error) { + console.error("Failed to start bulk processing:", error); + toastError({ + title: "Failed to start", + description: "An error occurred. Please try again.", + }); + dispatch({ type: "RESET" }); + } + }; + + const handlePauseResume = () => { + if (isPaused) { + resumeAiQueue(); + dispatch({ type: "RESUME" }); + } else { + pauseAiQueue(); + dispatch({ type: "PAUSE" }); + } + }; + + const handleStop = () => { + dispatch({ type: "STOP", completedCount: completed }); + clearAiQueue(); + clearAiQueueAtom(); + abortRef.current?.(); + }; + + const progressMessage = getProgressMessage(state, remaining); return (
@@ -72,51 +144,45 @@ export function BulkRunRules() { Bulk Process Emails - + - Process Existing Inbox Emails + Bulk Process Emails + + Run your rules on emails in your inbox that haven't been handled + yet. + {data && ( <> - - This runs your rules on {includeRead ? "all" : "unread"}{" "} - emails currently in your inbox (that have not been previously - processed). - - - {processedThreadIds.size > 0 && ( + {progressMessage && (
- {remaining > 0 - ? `Progress: ${completed}/${processedThreadIds.size} emails completed` - : `Success: Processed ${processedThreadIds.size} emails`} + {progressMessage}
)} {hasAiAccess ? ( -
+
{ setStartDate(date); - setRunResult(null); - setProcessedThreadIds(new Set()); + dispatch({ type: "RESET" }); }} value={startDate} placeholder="Set start date" - disabled={running} + disabled={isProcessing} /> { setEndDate(date); - setRunResult(null); - setProcessedThreadIds(new Set()); + dispatch({ type: "RESET" }); }} value={endDate} placeholder="Set end date (optional)" - disabled={running} + disabled={isProcessing} />
@@ -126,7 +192,7 @@ export function BulkRunRules() { label="Include read emails" enabled={includeRead} onChange={(enabled) => setIncludeRead(enabled)} - disabled={running || !isBusinessPlusTier} + disabled={isProcessing || !isBusinessPlusTier} /> {!isBusinessPlusTier && ( - - {running && ( - + /> + )} + + {(state.status === "idle" || + state.status === "stopped") && + !isProcessing && ( + + )} + {isBusy && ( +
+ + +
)} - {runResult && runResult.count === 0 && ( + {state.runResult && state.runResult.count === 0 && (
No {includeRead ? "" : "unread "}emails found in the selected date range. @@ -224,7 +288,7 @@ async function onRun( endDate, includeRead, }: { startDate: Date; endDate?: Date; includeRead?: boolean }, - onThreadsQueued: (threadIds: string[]) => void, + onThreadsQueued: (threads: ThreadsResponse["threads"]) => void, onComplete: ( status: "success" | "error" | "cancelled", count: number, @@ -289,7 +353,7 @@ async function onRun( const threadsWithoutPlan = data.threads.filter((t) => !t.plan); - onThreadsQueued(threadsWithoutPlan.map((t) => t.id)); + onThreadsQueued(threadsWithoutPlan); totalProcessed += threadsWithoutPlan.length; runAiRules(emailAccountId, threadsWithoutPlan, false); diff --git a/apps/web/app/(app)/[emailAccountId]/assistant/bulk-run-rules-reducer.test.ts b/apps/web/app/(app)/[emailAccountId]/assistant/bulk-run-rules-reducer.test.ts new file mode 100644 index 0000000000..76a60707f7 --- /dev/null +++ b/apps/web/app/(app)/[emailAccountId]/assistant/bulk-run-rules-reducer.test.ts @@ -0,0 +1,422 @@ +import { describe, it, expect } from "vitest"; +import { + bulkRunReducer, + getProgressMessage, + initialBulkRunState, + type BulkRunState, +} from "./bulk-run-rules-reducer"; +import type { ThreadsResponse } from "@/app/api/threads/route"; + +// Helper to create mock threads +function createMockThread(id: string): ThreadsResponse["threads"][number] { + return { + id, + snippet: "Test snippet", + messages: [ + { + id: `${id}-msg`, + historyId: "123", + threadId: id, + labelIds: ["INBOX"], + headers: { + from: "test@test.com", + to: "recipient@test.com", + subject: "Test", + date: "2024-01-01", + }, + textPlain: "", + textHtml: "", + snippet: "", + inline: [], + internalDate: "123", + subject: "Test", + date: "2024-01-01", + }, + ], + plan: undefined, + }; +} + +describe("bulkRunReducer", () => { + describe("START action", () => { + it("transitions from idle to processing", () => { + const result = bulkRunReducer(initialBulkRunState, { type: "START" }); + + expect(result.status).toBe("processing"); + expect(result.processedThreadIds.size).toBe(0); + expect(result.fetchedThreads.size).toBe(0); + expect(result.stoppedCount).toBeNull(); + expect(result.runResult).toBeNull(); + }); + + it("clears previous state when starting again", () => { + const thread = createMockThread("thread1"); + const stateWithData: BulkRunState = { + status: "stopped", + processedThreadIds: new Set(["thread1", "thread2"]), + fetchedThreads: new Map([["thread1", thread]]), + stoppedCount: 2, + runResult: { count: 5 }, + }; + + const result = bulkRunReducer(stateWithData, { type: "START" }); + + expect(result.status).toBe("processing"); + expect(result.processedThreadIds.size).toBe(0); + expect(result.fetchedThreads.size).toBe(0); + expect(result.stoppedCount).toBeNull(); + expect(result.runResult).toBeNull(); + }); + }); + + describe("THREADS_QUEUED action", () => { + it("adds thread IDs and threads to state", () => { + const state: BulkRunState = { + ...initialBulkRunState, + status: "processing", + }; + const threads = [ + createMockThread("thread1"), + createMockThread("thread2"), + ]; + + const result = bulkRunReducer(state, { + type: "THREADS_QUEUED", + threads, + }); + + expect(result.processedThreadIds.size).toBe(2); + expect(result.processedThreadIds.has("thread1")).toBe(true); + expect(result.processedThreadIds.has("thread2")).toBe(true); + expect(result.fetchedThreads.size).toBe(2); + expect(result.fetchedThreads.get("thread1")).toBe(threads[0]); + expect(result.fetchedThreads.get("thread2")).toBe(threads[1]); + }); + + it("accumulates threads across multiple calls", () => { + let state: BulkRunState = { + ...initialBulkRunState, + status: "processing", + }; + const threads1 = [ + createMockThread("thread1"), + createMockThread("thread2"), + ]; + const threads2 = [createMockThread("thread3")]; + + state = bulkRunReducer(state, { + type: "THREADS_QUEUED", + threads: threads1, + }); + state = bulkRunReducer(state, { + type: "THREADS_QUEUED", + threads: threads2, + }); + + expect(state.processedThreadIds.size).toBe(3); + expect(state.processedThreadIds.has("thread1")).toBe(true); + expect(state.processedThreadIds.has("thread3")).toBe(true); + expect(state.fetchedThreads.size).toBe(3); + }); + + it("does not duplicate existing thread IDs", () => { + const existingThread = createMockThread("thread1"); + const state: BulkRunState = { + ...initialBulkRunState, + status: "processing", + processedThreadIds: new Set(["thread1"]), + fetchedThreads: new Map([["thread1", existingThread]]), + }; + const newThreads = [ + createMockThread("thread1"), + createMockThread("thread2"), + ]; + + const result = bulkRunReducer(state, { + type: "THREADS_QUEUED", + threads: newThreads, + }); + + expect(result.processedThreadIds.size).toBe(2); + expect(result.fetchedThreads.size).toBe(2); + }); + + it("allows lookup of any queued thread by ID (fixes inbox cache mismatch)", () => { + // This test validates the fix for the bug where threads fetched during + // bulk processing might not exist in the global inbox cache, causing + // activity log entries to be silently skipped. + let state: BulkRunState = { + ...initialBulkRunState, + status: "processing", + }; + + // Simulate multiple batches of threads being fetched (paginated) + const batch1 = [createMockThread("old-thread-1")]; + const batch2 = [createMockThread("old-thread-2")]; + const batch3 = [createMockThread("recent-thread")]; + + state = bulkRunReducer(state, { + type: "THREADS_QUEUED", + threads: batch1, + }); + state = bulkRunReducer(state, { + type: "THREADS_QUEUED", + threads: batch2, + }); + state = bulkRunReducer(state, { + type: "THREADS_QUEUED", + threads: batch3, + }); + + // All threads should be retrievable by ID, even old ones + // that wouldn't be in a typical inbox cache + for (const threadId of state.processedThreadIds) { + const thread = state.fetchedThreads.get(threadId); + expect(thread).toBeDefined(); + expect(thread?.id).toBe(threadId); + } + }); + }); + + describe("COMPLETE action", () => { + it("transitions to idle when count is 0 (no emails found)", () => { + const state: BulkRunState = { + ...initialBulkRunState, + status: "processing", + }; + + const result = bulkRunReducer(state, { type: "COMPLETE", count: 0 }); + + expect(result.status).toBe("idle"); + expect(result.runResult).toEqual({ count: 0 }); + }); + + it("stays in processing state when count > 0", () => { + const state: BulkRunState = { + ...initialBulkRunState, + status: "processing", + processedThreadIds: new Set(["thread1"]), + }; + + const result = bulkRunReducer(state, { type: "COMPLETE", count: 5 }); + + expect(result.status).toBe("processing"); + }); + + it("does not override stopped status", () => { + const state: BulkRunState = { + ...initialBulkRunState, + status: "stopped", + stoppedCount: 3, + }; + + const result = bulkRunReducer(state, { type: "COMPLETE", count: 0 }); + + expect(result.status).toBe("stopped"); + expect(result.stoppedCount).toBe(3); + }); + + it("preserves paused status when count > 0", () => { + const state: BulkRunState = { + ...initialBulkRunState, + status: "paused", + processedThreadIds: new Set(["thread1"]), + }; + + const result = bulkRunReducer(state, { type: "COMPLETE", count: 5 }); + + expect(result.status).toBe("paused"); + }); + }); + + describe("PAUSE action", () => { + it("transitions from processing to paused", () => { + const state: BulkRunState = { + ...initialBulkRunState, + status: "processing", + }; + + const result = bulkRunReducer(state, { type: "PAUSE" }); + + expect(result.status).toBe("paused"); + }); + + it("does nothing if not in processing state", () => { + const state: BulkRunState = { + ...initialBulkRunState, + status: "idle", + }; + + const result = bulkRunReducer(state, { type: "PAUSE" }); + + expect(result.status).toBe("idle"); + }); + + it("does nothing if already paused", () => { + const state: BulkRunState = { + ...initialBulkRunState, + status: "paused", + }; + + const result = bulkRunReducer(state, { type: "PAUSE" }); + + expect(result.status).toBe("paused"); + }); + }); + + describe("RESUME action", () => { + it("transitions from paused to processing", () => { + const state: BulkRunState = { + ...initialBulkRunState, + status: "paused", + }; + + const result = bulkRunReducer(state, { type: "RESUME" }); + + expect(result.status).toBe("processing"); + }); + + it("does nothing if not in paused state", () => { + const state: BulkRunState = { + ...initialBulkRunState, + status: "processing", + }; + + const result = bulkRunReducer(state, { type: "RESUME" }); + + expect(result.status).toBe("processing"); + }); + }); + + describe("STOP action", () => { + it("transitions to stopped and captures completed count", () => { + const state: BulkRunState = { + ...initialBulkRunState, + status: "processing", + processedThreadIds: new Set(["t1", "t2", "t3", "t4", "t5"]), + }; + + const result = bulkRunReducer(state, { + type: "STOP", + completedCount: 3, + }); + + expect(result.status).toBe("stopped"); + expect(result.stoppedCount).toBe(3); + }); + + it("does not override if already stopped", () => { + const state: BulkRunState = { + ...initialBulkRunState, + status: "stopped", + stoppedCount: 5, + }; + + const result = bulkRunReducer(state, { + type: "STOP", + completedCount: 10, + }); + + expect(result.status).toBe("stopped"); + expect(result.stoppedCount).toBe(5); + }); + + it("works when stopping from paused state", () => { + const state: BulkRunState = { + ...initialBulkRunState, + status: "paused", + }; + + const result = bulkRunReducer(state, { + type: "STOP", + completedCount: 8, + }); + + expect(result.status).toBe("stopped"); + expect(result.stoppedCount).toBe(8); + }); + }); + + describe("RESET action", () => { + it("resets all state to initial values", () => { + const thread = createMockThread("t1"); + const state: BulkRunState = { + status: "stopped", + processedThreadIds: new Set(["t1", "t2"]), + fetchedThreads: new Map([["t1", thread]]), + stoppedCount: 5, + runResult: { count: 10 }, + }; + + const result = bulkRunReducer(state, { type: "RESET" }); + + expect(result.status).toBe("idle"); + expect(result.processedThreadIds.size).toBe(0); + expect(result.fetchedThreads.size).toBe(0); + expect(result.stoppedCount).toBeNull(); + expect(result.runResult).toBeNull(); + }); + }); +}); + +describe("getProgressMessage", () => { + it("returns null when no emails processed", () => { + const state: BulkRunState = { + ...initialBulkRunState, + status: "processing", + }; + + const result = getProgressMessage(state, 0); + + expect(result).toBeNull(); + }); + + it("shows progress during processing with remaining items", () => { + const state: BulkRunState = { + ...initialBulkRunState, + status: "processing", + processedThreadIds: new Set(["t1", "t2", "t3", "t4", "t5"]), + }; + + const result = getProgressMessage(state, 3); + + expect(result).toBe("Progress: 2/5 emails completed"); + }); + + it("shows stoppedCount after stop", () => { + const state: BulkRunState = { + ...initialBulkRunState, + status: "stopped", + processedThreadIds: new Set(["t1", "t2", "t3", "t4", "t5"]), + stoppedCount: 3, + }; + + const result = getProgressMessage(state, 0); + + expect(result).toBe("Processed 3 emails"); + }); + + it("shows total on completion", () => { + const state: BulkRunState = { + ...initialBulkRunState, + status: "idle", + processedThreadIds: new Set(["t1", "t2", "t3", "t4", "t5"]), + }; + + const result = getProgressMessage(state, 0); + + expect(result).toBe("Processed 5 emails"); + }); + + it("shows progress when paused", () => { + const state: BulkRunState = { + ...initialBulkRunState, + status: "paused", + processedThreadIds: new Set(["t1", "t2", "t3", "t4"]), + }; + + const result = getProgressMessage(state, 2); + + expect(result).toBe("Progress: 2/4 emails completed"); + }); +}); diff --git a/apps/web/app/(app)/[emailAccountId]/assistant/bulk-run-rules-reducer.ts b/apps/web/app/(app)/[emailAccountId]/assistant/bulk-run-rules-reducer.ts new file mode 100644 index 0000000000..3949890daf --- /dev/null +++ b/apps/web/app/(app)/[emailAccountId]/assistant/bulk-run-rules-reducer.ts @@ -0,0 +1,127 @@ +import type { ThreadsResponse } from "@/app/api/threads/route"; + +type Thread = ThreadsResponse["threads"][number]; + +export type ProcessingStatus = "idle" | "processing" | "paused" | "stopped"; + +export type BulkRunState = { + status: ProcessingStatus; + processedThreadIds: Set; + // Stores fetched threads to ensure activity log can find them + // (the global inbox cache may not contain all fetched threads) + fetchedThreads: Map; + stoppedCount: number | null; + runResult: { count: number } | null; +}; + +export type BulkRunAction = + | { type: "START" } + | { type: "THREADS_QUEUED"; threads: Thread[] } + | { type: "COMPLETE"; count: number } + | { type: "PAUSE" } + | { type: "RESUME" } + | { type: "STOP"; completedCount: number } + | { type: "RESET" }; + +export const initialBulkRunState: BulkRunState = { + status: "idle", + processedThreadIds: new Set(), + fetchedThreads: new Map(), + stoppedCount: null, + runResult: null, +}; + +export function bulkRunReducer( + state: BulkRunState, + action: BulkRunAction, +): BulkRunState { + switch (action.type) { + case "START": + return { + ...state, + status: "processing", + processedThreadIds: new Set(), + fetchedThreads: new Map(), + stoppedCount: null, + runResult: null, + }; + + case "THREADS_QUEUED": { + const nextIds = new Set(state.processedThreadIds); + const nextThreads = new Map(state.fetchedThreads); + for (const thread of action.threads) { + nextIds.add(thread.id); + nextThreads.set(thread.id, thread); + } + return { + ...state, + processedThreadIds: nextIds, + fetchedThreads: nextThreads, + }; + } + + case "COMPLETE": + // Don't override stopped status + if (state.status === "stopped") return state; + + // No emails found - go back to idle + if (action.count === 0) { + return { + ...state, + status: "idle", + runResult: { count: 0 }, + }; + } + + // Keep current status (processing or paused) + return state; + + case "PAUSE": + if (state.status !== "processing") return state; + return { + ...state, + status: "paused", + }; + + case "RESUME": + if (state.status !== "paused") return state; + return { + ...state, + status: "processing", + }; + + case "STOP": + // Don't override if already stopped + if (state.status === "stopped") return state; + return { + ...state, + status: "stopped", + stoppedCount: action.completedCount, + }; + + case "RESET": + return initialBulkRunState; + + default: + return state; + } +} + +export function getProgressMessage( + state: BulkRunState, + remaining: number, +): string | null { + if (state.processedThreadIds.size === 0) return null; + + const completed = state.processedThreadIds.size - remaining; + + if (remaining > 0) { + return `Progress: ${completed}/${state.processedThreadIds.size} emails completed`; + } + + if (state.status === "stopped" && state.stoppedCount !== null) { + return `Processed ${state.stoppedCount} emails`; + } + + return `Processed ${state.processedThreadIds.size} emails`; +} diff --git a/apps/web/app/(landing)/components/page.tsx b/apps/web/app/(landing)/components/page.tsx index 0c94dd6ada..66ec96fdef 100644 --- a/apps/web/app/(landing)/components/page.tsx +++ b/apps/web/app/(landing)/components/page.tsx @@ -39,6 +39,10 @@ import { ResultsDisplay, ResultDisplayContent, } from "@/app/(app)/[emailAccountId]/assistant/ResultDisplay"; +import { + ActivityLog, + type ActivityLogEntry, +} from "@/app/(app)/[emailAccountId]/assistant/BulkProcessActivityLog"; export const maxDuration = 3; @@ -553,6 +557,76 @@ export default function Components() {
+
+
ActivityLog
+
+

+ Default with mixed states: +

+ + +

Paused state:

+ + +

+ Long text truncation test: +

+ ', + subject: + "This is an extremely long subject line that should definitely truncate properly when displayed in the activity log component - it just keeps going and going with more text", + status: "completed", + ruleName: "Newsletter", + }, + { + id: "long-2", + from: "Short ", + subject: "Short subject", + status: "processing", + }, + ]} + processingCount={1} + /> + +

All completed:

+ ", + subject: "Meeting notes", + status: "completed", + ruleName: "Work", + }, + { + id: "done-2", + from: "Bob ", + subject: "Project update", + status: "completed", + ruleName: "FYI", + }, + { + id: "done-3", + from: "Newsletter ", + subject: "Weekly digest", + status: "completed", + }, + ]} + processingCount={0} + /> +
+
+
MultiSelectFilter
@@ -714,3 +788,41 @@ function getRuleWithName(name: string): Rule { name, }; } + +function getActivityLogEntries(): ActivityLogEntry[] { + return [ + { + id: "1", + from: "Lenny's Newsletter ", + subject: "How Zapier's EA built an army of AI interns", + status: "completed", + ruleName: "Newsletter", + }, + { + id: "2", + from: "ZenDaily ", + subject: "🔮 ZenDaily - 15th Dec 2025 🔮", + status: "processing", + ruleName: "Newsletter", + }, + { + id: "3", + from: "Elie Steinbock ", + subject: "talk tomorrow", + status: "processing", + }, + { + id: "4", + from: "Morning Brew ", + subject: "☕ Gathering storm", + status: "waiting", + }, + { + id: "5", + from: "GitHub ", + subject: "PR review requested", + status: "completed", + ruleName: "To Review", + }, + ]; +} diff --git a/apps/web/hooks/useBeforeUnload.ts b/apps/web/hooks/useBeforeUnload.ts new file mode 100644 index 0000000000..38d7d93ad0 --- /dev/null +++ b/apps/web/hooks/useBeforeUnload.ts @@ -0,0 +1,21 @@ +import { useEffect } from "react"; + +/** + * Shows a browser confirmation dialog when the user tries to leave the page. + * @param enabled - Whether to show the warning (e.g., when there's unsaved work) + */ +export function useBeforeUnload(enabled: boolean) { + useEffect(() => { + if (!enabled) return; + + const handleBeforeUnload = (e: BeforeUnloadEvent) => { + e.preventDefault(); + // Required for cross-browser compatibility (Safari needs returnValue set) + e.returnValue = ""; + return ""; + }; + + window.addEventListener("beforeunload", handleBeforeUnload); + return () => window.removeEventListener("beforeunload", handleBeforeUnload); + }, [enabled]); +} diff --git a/apps/web/store/ai-queue.ts b/apps/web/store/ai-queue.ts index 90cec70c4c..ad13e2944c 100644 --- a/apps/web/store/ai-queue.ts +++ b/apps/web/store/ai-queue.ts @@ -26,6 +26,10 @@ export const removeFromAiQueueAtom = (removeId: string) => { }); }; +export const clearAiQueueAtom = () => { + jotaiStore.set(aiQueueAtom, new Set([])); +}; + const isInAiQueueAtom = atom((get) => { const ids = get(aiQueueAtom); return (id: string) => ids.has(id); diff --git a/apps/web/utils/queue/ai-queue.ts b/apps/web/utils/queue/ai-queue.ts index a89a2fa667..d45db2036c 100644 --- a/apps/web/utils/queue/ai-queue.ts +++ b/apps/web/utils/queue/ai-queue.ts @@ -2,5 +2,10 @@ import PQueue from "p-queue"; -// Avoid overwhelming AI API -export const aiQueue = new PQueue({ concurrency: 1 }); +// Process multiple AI requests in parallel for faster bulk operations +export const aiQueue = new PQueue({ concurrency: 3 }); + +export const pauseAiQueue = () => aiQueue.pause(); +export const resumeAiQueue = () => aiQueue.start(); +export const clearAiQueue = () => aiQueue.clear(); +export const isAiQueuePaused = () => aiQueue.isPaused;