diff --git a/.cursor/rules/features/delayed-actions.mdc b/.cursor/rules/features/delayed-actions.mdc new file mode 100644 index 0000000000..2361a8142f --- /dev/null +++ b/.cursor/rules/features/delayed-actions.mdc @@ -0,0 +1,218 @@ +# Delayed Actions Feature + +## Overview + +The delayed actions feature allows users to schedule email actions (like labeling, archiving, or replying) to be executed after a specified delay period. This is useful for scenarios like: + +- **Follow-up reminders**: Label emails that haven't been replied to after X days +- **Snooze functionality**: Archive emails and bring them back later +- **Time-sensitive processing**: Apply actions only after a waiting period + +## Implementation Architecture + +### Core Components + +1. **Action Delay Configuration** + - `Action.delayInMinutes` field: Optional delay from 1 minute to 90 days + - UI controls in `RuleForm.tsx` for setting delays + - Validation ensures delays are within acceptable bounds + +2. **Scheduled Action Storage** + - `ScheduledAction` model: Stores pending delayed actions + - Contains action details, timing, and execution status + - Links to `ExecutedRule` for context and audit trail + +3. **QStash Integration** + - Uses Upstash QStash for reliable message queuing + - Replaces cron-based polling with event-driven execution + - Provides built-in retries and error handling + +### Database Schema + +```prisma +model ScheduledAction { + id String @id @default(cuid()) + executedRuleId String + actionType ActionType + messageId String + threadId String + scheduledFor DateTime + emailAccountId String + status ScheduledActionStatus @default(PENDING) + + // Action-specific fields + label String? + subject String? + content String? + to String? + cc String? + bcc String? + url String? + + // QStash integration + scheduledId String? + + // Execution tracking + executedAt DateTime? + executedActionId String? @unique + + // Relationships and indexes... +} +``` + +## QStash Integration + +### Scheduling Process + +1. **Rule Execution**: When a rule matches an email, actions are split into: + - **Immediate actions**: Executed right away + - **Delayed actions**: Scheduled via QStash + +2. **QStash Scheduling**: + ```typescript + const notBefore = getUnixTime(addMinutes(new Date(), delayInMinutes)); + + const response = await qstash.publishJSON({ + url: `${process.env.NEXTAUTH_URL}/api/scheduled-actions/execute`, + body: { + scheduledActionId: scheduledAction.id, + }, + notBefore, // Unix timestamp for when to execute + deduplicationId: `scheduled-action-${scheduledAction.id}`, + }); + ``` + +3. **Deduplication**: Uses unique IDs to prevent duplicate execution +4. **Message ID Storage**: QStash scheduledId stored for efficient cancellation (field: scheduledId) + +### Execution Process + +1. **QStash Delivery**: QStash delivers message to `/api/scheduled-actions/execute` +2. **Signature Verification**: Validates QStash signature for security +3. **Action Execution**: + - Retrieves scheduled action from database + - Validates email still exists + - Executes the specific action using `runActionFunction` + - Updates execution status + +### Benefits Over Cron-Based Approach + +- **Reliability**: No polling, exact scheduling, built-in retries +- **Scalability**: No background processes, QStash handles infrastructure +- **Deduplication**: Prevents duplicate execution with unique IDs +- **Monitoring**: Better observability through QStash dashboard +- **Cancellation**: Direct message cancellation using stored message IDs + +## Key Functions + +### Core Scheduling Functions + +```typescript +// Create and schedule a single delayed action +export async function createScheduledAction({ + executedRuleId, + actionItem, + messageId, + threadId, + emailAccountId, + scheduledFor, +}) + +// Schedule multiple delayed actions for a rule execution +export async function scheduleDelayedActions({ + executedRuleId, + actionItems, + messageId, + threadId, + emailAccountId, +}) + +// Cancel existing scheduled actions (e.g., when new rule overrides) +export async function cancelScheduledActions({ + emailAccountId, + messageId, + threadId, + reason, +}) +``` + +### Usage in Rule Execution + +```typescript +// In run-rules.ts +// Cancel any existing scheduled actions for this message +await cancelScheduledActions({ + emailAccountId: emailAccount.id, + messageId: message.id, + threadId: message.threadId, + reason: "Superseded by new rule execution", +}); + +// Schedule delayed actions if any exist +if (executedRule && delayedActions.length > 0 && !isTest) { + await scheduleDelayedActions({ + executedRuleId: executedRule.id, + actionItems: delayedActions, + messageId: message.id, + threadId: message.threadId, + emailAccountId: emailAccount.id, + }); +} +``` + +## Migration Safety + +The database migration includes `IF NOT EXISTS` clauses to prevent conflicts: + +```sql +-- CreateEnum +CREATE TYPE IF NOT EXISTS "ScheduledActionStatus" AS ENUM ('PENDING', 'EXECUTING', 'COMPLETED', 'FAILED', 'CANCELLED'); + +-- AlterTable +ALTER TABLE "Action" ADD COLUMN IF NOT EXISTS "delayInMinutes" INTEGER; + +-- CreateTable +CREATE TABLE IF NOT EXISTS "ScheduledAction" ( + -- table definition +); + +-- CreateIndex +CREATE UNIQUE INDEX IF NOT EXISTS "ScheduledAction_executedActionId_key" ON "ScheduledAction"("executedActionId"); +``` + +## Usage Examples + +### Basic Delay Configuration +```typescript +// In rule action configuration +{ + type: "LABEL", + label: "Follow-up Needed", + delayInMinutes: 2880 // 2 days +} +``` + +### Follow-up Workflow +1. Email arrives and matches rule +2. Immediate action: Archive email +3. Delayed action: Label as "Follow-up" after 3 days +4. If user replies before 3 days, action can be cancelled + +## API Endpoints + +- `POST /api/scheduled-actions/execute`: QStash webhook for execution +- `DELETE /api/admin/scheduled-actions/[id]/cancel`: Cancel scheduled action +- `POST /api/admin/scheduled-actions/[id]/retry`: Retry failed action + +## Error Handling + +- **Email Not Found**: Action marked as completed with reason +- **Execution Failure**: Action marked as failed, logged for debugging +- **Cancellation**: QStash message cancelled, database updated +- **Retries**: QStash automatically retries failed deliveries + +## Monitoring + +- Database status tracking: PENDING → EXECUTING → COMPLETED/FAILED +- QStash dashboard for message delivery monitoring +- Structured logging for debugging and observability diff --git a/apps/web/__tests__/ai-choose-rule.test.ts b/apps/web/__tests__/ai-choose-rule.test.ts index 2baa6c4920..b021841e07 100644 --- a/apps/web/__tests__/ai-choose-rule.test.ts +++ b/apps/web/__tests__/ai-choose-rule.test.ts @@ -76,6 +76,7 @@ describe.runIf(isAiTest)("aiChooseRule", () => { cc: null, bcc: null, url: null, + delayInMinutes: null, }, ]); diff --git a/apps/web/app/(app)/[emailAccountId]/assistant/DraftReplies.tsx b/apps/web/app/(app)/[emailAccountId]/assistant/DraftReplies.tsx index cb9a307fb0..a6ce2078cd 100644 --- a/apps/web/app/(app)/[emailAccountId]/assistant/DraftReplies.tsx +++ b/apps/web/app/(app)/[emailAccountId]/assistant/DraftReplies.tsx @@ -92,6 +92,7 @@ function useDraftReplies() { cc: null, bcc: null, url: null, + delayInMinutes: null, createdAt: new Date(), updatedAt: new Date(), }, diff --git a/apps/web/app/(app)/[emailAccountId]/assistant/RuleDialog.tsx b/apps/web/app/(app)/[emailAccountId]/assistant/RuleDialog.tsx index cf7ca1c392..4e1712b6c6 100644 --- a/apps/web/app/(app)/[emailAccountId]/assistant/RuleDialog.tsx +++ b/apps/web/app/(app)/[emailAccountId]/assistant/RuleDialog.tsx @@ -28,7 +28,7 @@ export function RuleDialog({ initialRule, editMode = true, }: RuleDialogProps) { - const { data, isLoading, error } = useRule(ruleId || ""); + const { data, isLoading, error, mutate } = useRule(ruleId || ""); const handleSuccess = () => { onSuccess?.(); @@ -52,6 +52,7 @@ export function RuleDialog({ alwaysEditMode={editMode} onSuccess={handleSuccess} isDialog={true} + mutate={mutate} /> )} diff --git a/apps/web/app/(app)/[emailAccountId]/assistant/RuleForm.tsx b/apps/web/app/(app)/[emailAccountId]/assistant/RuleForm.tsx index 8232efc3e2..626063ecb8 100644 --- a/apps/web/app/(app)/[emailAccountId]/assistant/RuleForm.tsx +++ b/apps/web/app/(app)/[emailAccountId]/assistant/RuleForm.tsx @@ -89,6 +89,8 @@ import { } from "@/components/ui/select"; import { Form, FormControl, FormField, FormItem } from "@/components/ui/form"; import { isDefined } from "@/utils/types"; +import { canActionBeDelayed } from "@/utils/delayed-actions"; +import { useDelayedActionsEnabled } from "@/hooks/useFeatureFlags"; export function Rule({ ruleId, @@ -97,11 +99,17 @@ export function Rule({ ruleId: string; alwaysEditMode?: boolean; }) { - const { data, isLoading, error } = useRule(ruleId); + const { data, isLoading, error, mutate } = useRule(ruleId); return ( - {data && } + {data && ( + + )} ); } @@ -111,11 +119,13 @@ export function RuleForm({ alwaysEditMode = false, onSuccess, isDialog = false, + mutate, }: { rule: CreateRuleBody & { id?: string }; alwaysEditMode?: boolean; onSuccess?: () => void; isDialog?: boolean; + mutate?: (data?: any, options?: any) => void; }) { const { emailAccountId } = useAccount(); const digestEnabled = useDigestEnabled(); @@ -128,6 +138,7 @@ export function RuleForm({ actions: [ ...rule.actions.map((action) => ({ ...action, + delayInMinutes: action.delayInMinutes, content: { ...action.content, setManually: !!action.content?.value, @@ -158,7 +169,7 @@ export function RuleForm({ }); const { append, remove } = useFieldArray({ control, name: "actions" }); - const { userLabels, isLoading, mutate } = useLabels(); + const { userLabels, isLoading, mutate: mutateLabels } = useLabels(); const { categories, isLoading: categoriesLoading, @@ -194,6 +205,21 @@ export function RuleForm({ } if (data.id) { + if (mutate) { + // mutate delayInMinutes optimistically to keep the UI consistent + // in case the modal is reopened immediately after saving + const optimisticData = { + rule: { + ...rule, + actions: rule.actions.map((action, index) => ({ + ...action, + delayInMinutes: data.actions[index]?.delayInMinutes, + })), + }, + }; + mutate(optimisticData, false); + } + const res = await updateRuleAction(emailAccountId, { ...data, id: data.id, @@ -202,12 +228,16 @@ export function RuleForm({ if (res?.serverError) { console.error(res); toastError({ description: res.serverError }); + if (mutate) mutate(); } else if (!res?.data?.rule) { toastError({ description: "There was an error updating the rule.", }); + if (mutate) mutate(); } else { toastSuccess({ description: "Saved!" }); + // Revalidate to get the real data from server + if (mutate) mutate(); posthog.capture("User updated AI rule", { conditions: data.conditions.map((condition) => condition.type), actions: data.actions.map((action) => action.type), @@ -252,7 +282,16 @@ export function RuleForm({ } } }, - [userLabels, router, posthog, emailAccountId, isDialog, onSuccess], + [ + userLabels, + router, + posthog, + emailAccountId, + isDialog, + onSuccess, + mutate, + rule, + ], ); const conditions = watch("conditions"); @@ -292,9 +331,9 @@ export function RuleForm({ { label: "Forward", value: ActionType.FORWARD }, { label: "Mark read", value: ActionType.MARK_READ }, { label: "Mark spam", value: ActionType.MARK_SPAM }, + ...(digestEnabled ? [{ label: "Digest", value: ActionType.DIGEST }] : []), { label: "Call webhook", value: ActionType.CALL_WEBHOOK }, { label: "Auto-update reply label", value: ActionType.TRACK_THREAD }, - ...(digestEnabled ? [{ label: "Digest", value: ActionType.DIGEST }] : []), ]; }, [digestEnabled]); @@ -789,7 +828,7 @@ export function RuleForm({ errors={errors} userLabels={userLabels} isLoading={isLoading} - mutate={mutate} + mutate={mutateLabels} emailAccountId={emailAccountId} remove={remove} typeOptions={typeOptions} @@ -820,9 +859,10 @@ export function RuleForm({ )} -
+
@@ -948,6 +988,7 @@ function ActionCard({ }) { const fields = actionInputs[action.type].fields; const [expandedFields, setExpandedFields] = useState(false); + const delayedActionsEnabled = useDelayedActionsEnabled(); // Get expandable fields that should be visible regardless of expanded state const hasExpandableFields = fields.some((field) => field.expandable); @@ -958,6 +999,14 @@ function ActionCard({ ? !!watch(`actions.${index}.content.setManually`) : false; + const actionCanBeDelayed = useMemo( + () => delayedActionsEnabled && canActionBeDelayed(action.type), + [action.type, delayedActionsEnabled], + ); + + const delayValue = watch(`actions.${index}.delayInMinutes`); + const delayEnabled = !!delayValue; + // Helper function to determine if a field can use variables based on context const canFieldUseVariables = ( field: { name: string; expandable?: boolean }, @@ -1057,10 +1106,12 @@ function ActionCard({ >
) : (
-
)} @@ -1184,9 +1247,48 @@ function ActionCard({ })} {action.type === ActionType.TRACK_THREAD && } - - {/* Show the variable pro tip when action has visible fields that can use variables */} {shouldShowProTip && } + {actionCanBeDelayed && ( +
+
+
+ {delayEnabled && ( + + )} +
+ +
+ + { + const newValue = enabled ? 60 : null; + setValue(`actions.${index}.delayInMinutes`, newValue, { + shouldValidate: true, + }); + }} + /> +
+
+ + {errors?.actions?.[index]?.delayInMinutes && ( +
+ +
+ )} +
+ )} {hasExpandableFields && (
@@ -1295,6 +1397,7 @@ export function ThreadsExplanation({ size }: { size: "sm" | "md" }) { return ( ); @@ -1355,3 +1458,102 @@ function VariableProTip() {
); } + +function DelayInputControls({ + index, + delayInMinutes, + setValue, +}: { + index: number; + delayInMinutes: number | null | undefined; + setValue: ReturnType>["setValue"]; +}) { + const { value: displayValue, unit } = getDisplayValueAndUnit(delayInMinutes); + + const handleValueChange = (newValue: string, currentUnit: string) => { + const minutes = convertToMinutes(newValue, currentUnit); + setValue(`actions.${index}.delayInMinutes`, minutes, { + shouldValidate: true, + }); + }; + + const handleUnitChange = (newUnit: string) => { + if (displayValue) { + const minutes = convertToMinutes(displayValue, newUnit); + setValue(`actions.${index}.delayInMinutes`, minutes); + } + }; + + const delayConfig = { + displayValue, + unit, + handleValueChange, + handleUnitChange, + }; + + return ( +
+
+ ); +} + +// minutes to user-friendly UI format +function getDisplayValueAndUnit(minutes: number | null | undefined) { + if (minutes === null || minutes === undefined) + return { value: "", unit: "hours" }; + if (minutes === -1 || minutes <= 0) return { value: "", unit: "hours" }; + + if (minutes >= 1440 && minutes % 1440 === 0) { + return { value: (minutes / 1440).toString(), unit: "days" }; + } else if (minutes >= 60 && minutes % 60 === 0) { + return { value: (minutes / 60).toString(), unit: "hours" }; + } else { + return { value: minutes.toString(), unit: "minutes" }; + } +} + +// user-friendly UI format to minutes +function convertToMinutes(value: string, unit: string) { + const numValue = Number.parseInt(value, 10); + if (Number.isNaN(numValue) || numValue <= 0) return -1; + + switch (unit) { + case "minutes": + return numValue; + case "hours": + return numValue * 60; + case "days": + return numValue * 1440; + default: + return numValue; + } +} diff --git a/apps/web/app/(app)/[emailAccountId]/assistant/Rules.tsx b/apps/web/app/(app)/[emailAccountId]/assistant/Rules.tsx index ea6a4e53d3..c7f832c412 100644 --- a/apps/web/app/(app)/[emailAccountId]/assistant/Rules.tsx +++ b/apps/web/app/(app)/[emailAccountId]/assistant/Rules.tsx @@ -133,6 +133,7 @@ export function Rules({ size = "md" }: { size?: "sm" | "md" }) { cc: null, bcc: null, url: null, + delayInMinutes: null, }, showArchiveAction ? { @@ -148,6 +149,7 @@ export function Rules({ size = "md" }: { size?: "sm" | "md" }) { cc: null, bcc: null, url: null, + delayInMinutes: null, } : null, emailAccountData?.coldEmailDigest @@ -164,6 +166,7 @@ export function Rules({ size = "md" }: { size?: "sm" | "md" }) { cc: null, bcc: null, url: null, + delayInMinutes: null, } : null, ].filter(isDefined), diff --git a/apps/web/app/(app)/[emailAccountId]/assistant/onboarding/CategoriesSetup.tsx b/apps/web/app/(app)/[emailAccountId]/assistant/onboarding/CategoriesSetup.tsx index ce6c6cf762..834e0c4ca5 100644 --- a/apps/web/app/(app)/[emailAccountId]/assistant/onboarding/CategoriesSetup.tsx +++ b/apps/web/app/(app)/[emailAccountId]/assistant/onboarding/CategoriesSetup.tsx @@ -34,6 +34,7 @@ import { } from "@/app/(app)/[emailAccountId]/assistant/onboarding/ExampleDialog"; import { categoryConfig } from "@/utils/category-config"; import { useAccount } from "@/providers/EmailAccountProvider"; +import { useDelayedActionsEnabled } from "@/hooks/useFeatureFlags"; const NEXT_URL = "/assistant/onboarding/draft-replies"; @@ -162,6 +163,8 @@ function CategoryCard({ form: ReturnType>; tooltipText?: string; }) { + const delayedActionsEnabled = useDelayedActionsEnabled(); + return ( @@ -205,8 +208,13 @@ function CategoryCard({ Label - Label + Skip Inbox + Label + skip inbox + {delayedActionsEnabled && ( + + Label + archive after a week + + )} None diff --git a/apps/web/app/api/scheduled-actions/execute/route.ts b/apps/web/app/api/scheduled-actions/execute/route.ts new file mode 100644 index 0000000000..d21ac7b00b --- /dev/null +++ b/apps/web/app/api/scheduled-actions/execute/route.ts @@ -0,0 +1,109 @@ +import { verifySignatureAppRouter } from "@upstash/qstash/nextjs"; +import type { NextRequest } from "next/server"; +import { z } from "zod"; +import { withError } from "@/utils/middleware"; +import { createScopedLogger } from "@/utils/logger"; +import { markQStashActionAsExecuting } from "@/utils/scheduled-actions/scheduler"; +import { executeScheduledAction } from "@/utils/scheduled-actions/executor"; +import prisma from "@/utils/prisma"; +import { ScheduledActionStatus } from "@prisma/client"; + +const logger = createScopedLogger("scheduled-actions-executor"); + +export const maxDuration = 300; // 5 minutes + +const scheduledActionBody = z.object({ + scheduledActionId: z.string().min(1, "Scheduled action ID is required"), +}); + +export const POST = verifySignatureAppRouter( + withError(async (request: NextRequest) => { + try { + logger.info("QStash request received", { + url: request.url, + method: request.method, + headers: Object.fromEntries(request.headers.entries()), + }); + + const rawPayload = await request.json(); + const validationResult = scheduledActionBody.safeParse(rawPayload); + + if (!validationResult.success) { + logger.error("Invalid payload structure", { + errors: validationResult.error.errors, + receivedPayload: rawPayload, + }); + return new Response("Invalid payload structure", { status: 400 }); + } + + const payload = validationResult.data; + + logger.info("Received QStash scheduled action execution request", { + scheduledActionId: payload.scheduledActionId, + payload, + }); + + const scheduledAction = await prisma.scheduledAction.findUnique({ + where: { id: payload.scheduledActionId }, + include: { + emailAccount: true, + executedRule: true, + }, + }); + + if (!scheduledAction) { + logger.warn("Scheduled action not found", { + scheduledActionId: payload.scheduledActionId, + }); + return new Response("Scheduled action not found", { status: 404 }); + } + + // Check if action is still pending (might have been cancelled) + if (scheduledAction.status === ScheduledActionStatus.CANCELLED) { + logger.info("Scheduled action was cancelled, skipping execution", { + scheduledActionId: payload.scheduledActionId, + }); + return new Response("Action was cancelled", { status: 200 }); + } + + if (scheduledAction.status !== ScheduledActionStatus.PENDING) { + logger.warn("Scheduled action is not in pending status", { + scheduledActionId: payload.scheduledActionId, + status: scheduledAction.status, + }); + return new Response("Action is not pending", { status: 200 }); + } + + // Mark as executing to prevent duplicate processing + const markedAction = await markQStashActionAsExecuting( + scheduledAction.id, + ); + if (!markedAction) { + logger.warn("Action already being processed or completed", { + scheduledActionId: scheduledAction.id, + }); + return new Response("Action already being processed", { status: 200 }); + } + + // Execute the action + const executionResult = await executeScheduledAction(scheduledAction); + + if (executionResult.success) { + logger.info("Successfully executed QStash scheduled action", { + scheduledActionId: scheduledAction.id, + executedActionId: executionResult.executedActionId, + }); + return new Response("Action executed successfully", { status: 200 }); + } else { + logger.error("Failed to execute QStash scheduled action", { + scheduledActionId: scheduledAction.id, + error: executionResult.error, + }); + return new Response("Action execution failed", { status: 500 }); + } + } catch (error) { + logger.error("QStash scheduled action execution failed", { error }); + return new Response("Internal server error", { status: 500 }); + } + }), +); diff --git a/apps/web/components/TooltipExplanation.tsx b/apps/web/components/TooltipExplanation.tsx index f7affd0eeb..caf531d79d 100644 --- a/apps/web/components/TooltipExplanation.tsx +++ b/apps/web/components/TooltipExplanation.tsx @@ -1,7 +1,15 @@ +"use client"; + import { HelpCircleIcon } from "lucide-react"; -import { Tooltip } from "@/components/Tooltip"; import { cva, type VariantProps } from "class-variance-authority"; import { cn } from "@/utils"; +import { + Tooltip, + TooltipContent, + TooltipProvider, + TooltipTrigger, +} from "@/components/ui/tooltip"; +import { useState, useCallback } from "react"; const tooltipIconVariants = cva("cursor-pointer", { variants: { @@ -19,18 +27,34 @@ interface TooltipExplanationProps extends React.HTMLAttributes, VariantProps { text: string; + side?: "top" | "right" | "bottom" | "left"; } export function TooltipExplanation({ text, size, className, + side = "top", }: TooltipExplanationProps) { + const [isOpen, setIsOpen] = useState(false); + + const handleClick = useCallback(() => { + setIsOpen((prev) => !prev); + }, []); + return ( - - - + + + + + + +

{text}

+
+
+
); } diff --git a/apps/web/components/ui/tooltip.tsx b/apps/web/components/ui/tooltip.tsx index fbd81fe0d3..4057f0915e 100644 --- a/apps/web/components/ui/tooltip.tsx +++ b/apps/web/components/ui/tooltip.tsx @@ -1,31 +1,61 @@ "use client"; -import * as React from "react"; +import type * as React from "react"; import * as TooltipPrimitive from "@radix-ui/react-tooltip"; import { cn } from "@/utils"; -const TooltipProvider = TooltipPrimitive.Provider; +function TooltipProvider({ + delayDuration = 0, + ...props +}: React.ComponentProps) { + return ( + + ); +} -const Tooltip = TooltipPrimitive.Root; +function Tooltip({ + ...props +}: React.ComponentProps) { + return ( + + + + ); +} -const TooltipTrigger = TooltipPrimitive.Trigger; +function TooltipTrigger({ + ...props +}: React.ComponentProps) { + return ; +} -const TooltipContent = React.forwardRef< - React.ElementRef, - React.ComponentPropsWithoutRef ->(({ className, sideOffset = 4, ...props }, ref) => ( - -)); -TooltipContent.displayName = TooltipPrimitive.Content.displayName; +function TooltipContent({ + className, + sideOffset = 0, + children, + ...props +}: React.ComponentProps) { + return ( + + + {children} + + + + ); +} export { Tooltip, TooltipTrigger, TooltipContent, TooltipProvider }; diff --git a/apps/web/hooks/useFeatureFlags.ts b/apps/web/hooks/useFeatureFlags.ts index 36e7454d87..a41a840413 100644 --- a/apps/web/hooks/useFeatureFlags.ts +++ b/apps/web/hooks/useFeatureFlags.ts @@ -42,6 +42,10 @@ export function useDigestEnabled() { // return true; } +export function useDelayedActionsEnabled() { + return useFeatureFlagEnabled("delayed-actions"); +} + export type TestimonialsVariant = "control" | "senja-widget"; export function useTestimonialsVariant() { diff --git a/apps/web/prisma/migrations/20250626043046_add_scheduled_actions/migration.sql b/apps/web/prisma/migrations/20250626043046_add_scheduled_actions/migration.sql new file mode 100644 index 0000000000..f7c27d97c1 --- /dev/null +++ b/apps/web/prisma/migrations/20250626043046_add_scheduled_actions/migration.sql @@ -0,0 +1,54 @@ +-- CreateEnum +CREATE TYPE IF NOT EXISTS "ScheduledActionStatus" AS ENUM ('PENDING', 'EXECUTING', 'COMPLETED', 'FAILED', 'CANCELLED'); + +-- CreateEnum +CREATE TYPE IF NOT EXISTS "SchedulingStatus" AS ENUM ('PENDING', 'SCHEDULED', 'FAILED'); + + +-- AlterTable +ALTER TABLE "Action" ADD COLUMN IF NOT EXISTS "delayInMinutes" INTEGER; + +-- CreateTable +CREATE TABLE IF NOT EXISTS "ScheduledAction" ( + "id" TEXT NOT NULL, + "executedRuleId" TEXT NOT NULL, + "actionType" "ActionType" NOT NULL, + "messageId" TEXT NOT NULL, + "threadId" TEXT NOT NULL, + "scheduledFor" TIMESTAMP(3) NOT NULL, + "emailAccountId" TEXT NOT NULL, + "status" "ScheduledActionStatus" NOT NULL DEFAULT 'PENDING', + "schedulingStatus" "SchedulingStatus" NOT NULL DEFAULT 'PENDING', + "label" TEXT, + "subject" TEXT, + "content" TEXT, + "to" TEXT, + "cc" TEXT, + "bcc" TEXT, + "url" TEXT, + "scheduledId" TEXT, + "executedAt" TIMESTAMP(3), + "executedActionId" TEXT, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "ScheduledAction_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX IF NOT EXISTS "ScheduledAction_executedActionId_key" ON "ScheduledAction"("executedActionId"); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS "ScheduledAction_status_scheduledFor_idx" ON "ScheduledAction"("status", "scheduledFor"); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS "ScheduledAction_emailAccountId_messageId_idx" ON "ScheduledAction"("emailAccountId", "messageId"); + +-- AddForeignKey +ALTER TABLE "ScheduledAction" ADD CONSTRAINT "ScheduledAction_executedRuleId_fkey" FOREIGN KEY ("executedRuleId") REFERENCES "ExecutedRule"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "ScheduledAction" ADD CONSTRAINT "ScheduledAction_executedActionId_fkey" FOREIGN KEY ("executedActionId") REFERENCES "ExecutedAction"("id") ON DELETE SET NULL ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "ScheduledAction" ADD CONSTRAINT "ScheduledAction_emailAccountId_fkey" FOREIGN KEY ("emailAccountId") REFERENCES "EmailAccount"("id") ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/apps/web/prisma/schema.prisma b/apps/web/prisma/schema.prisma index 1d3b0bfa96..146d74b088 100644 --- a/apps/web/prisma/schema.prisma +++ b/apps/web/prisma/schema.prisma @@ -118,37 +118,37 @@ model EmailAccount { outboundReplyTracking Boolean @default(false) autoCategorizeSenders Boolean @default(false) - // Digest email settings - digestSchedule Schedule? + digestSchedule Schedule? userId String user User @relation(fields: [userId], references: [id], onDelete: Cascade) accountId String @unique account Account @relation(fields: [accountId], references: [id], onDelete: Cascade) - labels Label[] - rules Rule[] - executedRules ExecutedRule[] - newsletters Newsletter[] - coldEmails ColdEmail[] - groups Group[] - categories Category[] - threadTrackers ThreadTracker[] - cleanupJobs CleanupJob[] - cleanupThreads CleanupThread[] - emailMessages EmailMessage[] - emailTokens EmailToken[] - knowledge Knowledge[] - chats Chat[] - digests Digest[] + labels Label[] + rules Rule[] + executedRules ExecutedRule[] + newsletters Newsletter[] + coldEmails ColdEmail[] + groups Group[] + categories Category[] + threadTrackers ThreadTracker[] + cleanupJobs CleanupJob[] + cleanupThreads CleanupThread[] + emailMessages EmailMessage[] + emailTokens EmailToken[] + knowledge Knowledge[] + chats Chat[] + digests Digest[] + scheduledActions ScheduledAction[] @@index([lastSummaryEmailAt]) } model Digest { - id String @id @default(cuid()) - createdAt DateTime @default(now()) - updatedAt DateTime @updatedAt + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt emailAccountId String emailAccount EmailAccount @relation(fields: [emailAccountId], references: [id], onDelete: Cascade) items DigestItem[] @@ -159,18 +159,18 @@ model Digest { } model DigestItem { - id String @id @default(cuid()) - createdAt DateTime @default(now()) - updatedAt DateTime @updatedAt - messageId String - threadId String - content String @db.Text - digestId String - digest Digest @relation(fields: [digestId], references: [id], onDelete: Cascade) - actionId String? - action ExecutedAction? @relation(fields: [actionId], references: [id]) - coldEmailId String? - coldEmail ColdEmail? @relation(fields: [coldEmailId], references: [id]) + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + messageId String + threadId String + content String @db.Text + digestId String + digest Digest @relation(fields: [digestId], references: [id], onDelete: Cascade) + actionId String? + action ExecutedAction? @relation(fields: [actionId], references: [id]) + coldEmailId String? + coldEmail ColdEmail? @relation(fields: [coldEmailId], references: [id]) @@unique([digestId, threadId, messageId]) } @@ -335,7 +335,6 @@ model Action { ruleId String rule Rule @relation(fields: [ruleId], references: [id], onDelete: Cascade) - // Optional static fields to use for this action label String? subject String? content String? @@ -343,6 +342,7 @@ model Action { cc String? bcc String? url String? + delayInMinutes Int? } model RuleHistory { @@ -394,7 +394,8 @@ model ExecutedRule { emailAccountId String emailAccount EmailAccount @relation(fields: [emailAccountId], references: [id], onDelete: Cascade) - actionItems ExecutedAction[] + actionItems ExecutedAction[] + scheduledActions ScheduledAction[] @@unique([emailAccountId, threadId, messageId], name: "unique_emailAccount_thread_message") @@index([emailAccountId, status, createdAt]) @@ -418,10 +419,45 @@ model ExecutedAction { url String? // additional fields as a result of the action - draftId String? // Gmail draft ID created by DRAFT_EMAIL action - wasDraftSent Boolean? // Tracks if the corresponding draft was sent (true) or ignored/superseded (false) - draftSendLog DraftSendLog? // Will exist if the draft was sent - digestItems DigestItem[] // Relation to digest items created by this action + draftId String? // Gmail draft ID created by DRAFT_EMAIL action + wasDraftSent Boolean? // Tracks if the corresponding draft was sent (true) or ignored/superseded (false) + draftSendLog DraftSendLog? // Will exist if the draft was sent + digestItems DigestItem[] // Relation to digest items created by this action + scheduledAction ScheduledAction? // Reverse relation for delayed actions +} + +model ScheduledAction { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + executedRuleId String + actionType ActionType + messageId String + threadId String + scheduledFor DateTime + emailAccountId String + status ScheduledActionStatus @default(PENDING) + schedulingStatus SchedulingStatus @default(PENDING) + + label String? + subject String? + content String? + to String? + cc String? + bcc String? + url String? + + scheduledId String? + + executedAt DateTime? + executedActionId String? @unique + + executedRule ExecutedRule @relation(fields: [executedRuleId], references: [id], onDelete: Cascade) + executedAction ExecutedAction? @relation(fields: [executedActionId], references: [id]) + emailAccount EmailAccount @relation(fields: [emailAccountId], references: [id], onDelete: Cascade) + + @@index([status, scheduledFor]) + @@index([emailAccountId, messageId]) } // Notes: @@ -851,4 +887,18 @@ enum DigestStatus { PROCESSING SENT FAILED -} \ No newline at end of file +} + +enum ScheduledActionStatus { + PENDING + EXECUTING + COMPLETED + FAILED + CANCELLED +} + +enum SchedulingStatus { + PENDING + SCHEDULED + FAILED +} diff --git a/apps/web/utils/action-item.ts b/apps/web/utils/action-item.ts index 7da80db4d4..07a1c36895 100644 --- a/apps/web/utils/action-item.ts +++ b/apps/web/utils/action-item.ts @@ -162,13 +162,21 @@ export function getActionFields(fields: Action | ExecutedAction | undefined) { type ActionFieldsSelection = Pick< Prisma.ActionCreateInput, - "type" | "label" | "subject" | "content" | "to" | "cc" | "bcc" | "url" + | "type" + | "label" + | "subject" + | "content" + | "to" + | "cc" + | "bcc" + | "url" + | "delayInMinutes" >; export function sanitizeActionFields( action: Partial & { type: ActionType }, ): ActionFieldsSelection { - const base = { + const base: ActionFieldsSelection = { type: action.type, label: null, subject: null, @@ -177,6 +185,7 @@ export function sanitizeActionFields( cc: null, bcc: null, url: null, + delayInMinutes: action.delayInMinutes || null, }; switch (action.type) { diff --git a/apps/web/utils/actions/rule.ts b/apps/web/utils/actions/rule.ts index 78349df9a7..5b4939bffb 100644 --- a/apps/web/utils/actions/rule.ts +++ b/apps/web/utils/actions/rule.ts @@ -45,6 +45,7 @@ import { getGmailClientForEmail } from "@/utils/account"; import { getEmailAccountWithAi } from "@/utils/user/get"; import { prefixPath } from "@/utils/path"; import { createRuleHistory } from "@/utils/rule/rule-history"; +import { ONE_WEEK_MINUTES } from "@/utils/date"; const logger = createScopedLogger("actions/rule"); @@ -76,7 +77,17 @@ export const createRuleAction = actionClient ? { createMany: { data: actions.map( - ({ type, label, subject, content, to, cc, bcc, url }) => { + ({ + type, + label, + subject, + content, + to, + cc, + bcc, + url, + delayInMinutes, + }) => { return sanitizeActionFields({ type, label: label?.value, @@ -86,6 +97,7 @@ export const createRuleAction = actionClient cc: cc?.value, bcc: bcc?.value, url: url?.value, + delayInMinutes, }); }, ), @@ -217,6 +229,7 @@ export const updateRuleAction = actionClient cc: a.cc?.value, bcc: a.bcc?.value, url: a.url?.value, + delayInMinutes: a.delayInMinutes, }), }); }), @@ -235,6 +248,7 @@ export const updateRuleAction = actionClient cc: a.cc?.value, bcc: a.bcc?.value, url: a.url?.value, + delayInMinutes: a.delayInMinutes, }), ruleId: id, }; @@ -466,7 +480,7 @@ export const createRulesOnboardingAction = actionClient const isSet = ( value: string | undefined, - ): value is "label" | "label_archive" => + ): value is "label" | "label_archive" | "label_archive_delayed" => value !== "none" && value !== undefined; // cold email blocker @@ -519,7 +533,7 @@ export const createRulesOnboardingAction = actionClient instructions: string, promptFileInstructions: string, runOnThreads: boolean, - categoryAction: "label" | "label_archive", + categoryAction: "label" | "label_archive" | "label_archive_delayed", label: string, systemType: SystemType, emailAccountId: string, @@ -542,7 +556,14 @@ export const createRulesOnboardingAction = actionClient { type: ActionType.LABEL, label }, ...(categoryAction === "label_archive" ? [{ type: ActionType.ARCHIVE }] - : []), + : categoryAction === "label_archive_delayed" + ? [ + { + type: ActionType.ARCHIVE, + delayInMinutes: ONE_WEEK_MINUTES, + }, + ] + : []), ...(hasDigest ? [{ type: ActionType.DIGEST }] : []), ], }, @@ -574,7 +595,14 @@ export const createRulesOnboardingAction = actionClient { type: ActionType.LABEL, label }, ...(categoryAction === "label_archive" ? [{ type: ActionType.ARCHIVE }] - : []), + : categoryAction === "label_archive_delayed" + ? [ + { + type: ActionType.ARCHIVE, + delayInMinutes: ONE_WEEK_MINUTES, + }, + ] + : []), ...(hasDigest ? [{ type: ActionType.DIGEST }] : []), ], }, @@ -591,7 +619,11 @@ export const createRulesOnboardingAction = actionClient rules.push( `${promptFileInstructions}${ - categoryAction === "label_archive" ? " and archive them" : "" + categoryAction === "label_archive" + ? " and archive them" + : categoryAction === "label_archive_delayed" + ? " and archive them after a week" + : "" }.`, ); } diff --git a/apps/web/utils/actions/rule.validation.ts b/apps/web/utils/actions/rule.validation.ts index f1a56ef7a5..0ed2d1552e 100644 --- a/apps/web/utils/actions/rule.validation.ts +++ b/apps/web/utils/actions/rule.validation.ts @@ -6,6 +6,13 @@ import { SystemType, } from "@prisma/client"; import { ConditionType } from "@/utils/config"; +import { NINETY_DAYS_MINUTES } from "@/utils/date"; + +export const delayInMinutesSchema = z + .number() + .min(1, "Minimum supported delay is 1 minute") + .max(NINETY_DAYS_MINUTES, "Maximum supported delay is 90 days") + .nullish(); const zodActionType = z.enum([ ActionType.ARCHIVE, @@ -73,6 +80,7 @@ const zodAction = z cc: zodField, bcc: zodField, url: zodField, + delayInMinutes: delayInMinutesSchema, }) .superRefine((data, ctx) => { if (data.type === ActionType.LABEL && !data.label?.value?.trim()) { @@ -163,7 +171,12 @@ export type UpdateRuleSettingsBody = z.infer; export const enableDraftRepliesBody = z.object({ enable: z.boolean() }); export type EnableDraftRepliesBody = z.infer; -const categoryAction = z.enum(["label", "label_archive", "none"]); +const categoryAction = z.enum([ + "label", + "label_archive", + "label_archive_delayed", + "none", +]); export type CategoryAction = z.infer; const categoryConfig = z.object({ diff --git a/apps/web/utils/ai/assistant/chat.ts b/apps/web/utils/ai/assistant/chat.ts index fbb6291fa2..ffa97833b0 100644 --- a/apps/web/utils/ai/assistant/chat.ts +++ b/apps/web/utils/ai/assistant/chat.ts @@ -15,6 +15,7 @@ import { saveLearnedPatterns } from "@/utils/rule/learned-patterns"; import { posthogCaptureEvent } from "@/utils/posthog"; import { chatCompletionStream } from "@/utils/llms"; import { filterNullProperties } from "@/utils"; +import { delayInMinutesSchema } from "@/utils/actions/rule.validation"; const logger = createScopedLogger("ai/assistant/chat"); @@ -91,6 +92,7 @@ const updateRuleActionsSchema = z.object({ bcc: z.string().nullish(), subject: z.string().nullish(), }), + delayInMinutes: delayInMinutesSchema, }), ), }); @@ -114,6 +116,7 @@ export type UpdateRuleActionsResult = { updatedActions?: Array<{ type: string; fields: Record; + delayInMinutes?: number | null; }>; error?: string; }; @@ -754,6 +757,7 @@ Examples: content: action.fields?.content ?? null, webhookUrl: action.fields?.webhookUrl ?? null, }, + delayInMinutes: action.delayInMinutes ?? null, })), }); diff --git a/apps/web/utils/ai/choose-rule/run-rules.ts b/apps/web/utils/ai/choose-rule/run-rules.ts index 3925c9dd2f..19f00ba97e 100644 --- a/apps/web/utils/ai/choose-rule/run-rules.ts +++ b/apps/web/utils/ai/choose-rule/run-rules.ts @@ -17,6 +17,11 @@ import type { MatchReason } from "@/utils/ai/choose-rule/types"; import { sanitizeActionFields } from "@/utils/action-item"; import { extractEmailAddress } from "@/utils/email"; import { analyzeSenderPattern } from "@/app/api/ai/analyze-sender-pattern/call-analyze-pattern-api"; +import { + scheduleDelayedActions, + cancelScheduledActions, +} from "@/utils/scheduled-actions/scheduler"; +import groupBy from "lodash/groupBy"; const logger = createScopedLogger("ai-run-rules"); @@ -95,6 +100,15 @@ async function executeMatchedRule( gmail, }); + if (!isTest) { + } + + const { immediateActions, delayedActions } = groupBy(actionItems, (item) => + item.delayInMinutes != null && item.delayInMinutes > 0 + ? "delayedActions" + : "immediateActions", + ); + // handle action const executedRule = isTest ? undefined @@ -106,12 +120,30 @@ async function executeMatchedRule( }, { rule, - actionItems, + actionItems: immediateActions, // Only save immediate actions as ExecutedActions reason, }, ); - const shouldExecute = executedRule && rule.automate; + if (executedRule && delayedActions.length > 0 && !isTest) { + // Attempts to cancel any existing scheduled actions to avoid duplicates + await cancelScheduledActions({ + emailAccountId: emailAccount.id, + messageId: message.id, + threadId: message.threadId, + reason: "Superseded by new rule execution", + }); + await scheduleDelayedActions({ + executedRuleId: executedRule.id, + actionItems: delayedActions, + messageId: message.id, + threadId: message.threadId, + emailAccountId: emailAccount.id, + }); + } + + const shouldExecute = + executedRule && rule.automate && immediateActions.length > 0; if (shouldExecute) { await executeAct({ @@ -184,7 +216,12 @@ async function saveExecutedRule( const data: Prisma.ExecutedRuleCreateInput = { actionItems: { createMany: { - data: actionItems?.map(sanitizeActionFields) || [], + data: + actionItems?.map((item) => { + const { delayInMinutes, ...executedActionFields } = + sanitizeActionFields(item); + return executedActionFields; + }) || [], }, }, messageId, diff --git a/apps/web/utils/ai/rule/create-rule-schema.ts b/apps/web/utils/ai/rule/create-rule-schema.ts index 619391f785..f6febbaf37 100644 --- a/apps/web/utils/ai/rule/create-rule-schema.ts +++ b/apps/web/utils/ai/rule/create-rule-schema.ts @@ -4,6 +4,7 @@ import { CategoryFilterType, LogicalOperator, } from "@prisma/client"; +import { delayInMinutesSchema } from "@/utils/actions/rule.validation"; const conditionSchema = z .object({ @@ -76,6 +77,7 @@ const actionSchema = z.object({ .describe( "The fields to use for the action. Static text can be combined with dynamic values using double braces {{}}. For example: 'Hi {{sender's name}}' or 'Re: {{subject}}' or '{{when I'm available for a meeting}}'. Dynamic values will be replaced with actual email data when the rule is executed. Dynamic values are generated in real time by the AI. Only use dynamic values where absolutely necessary. Otherwise, use plain static text. A field can be also be fully static or fully dynamic.", ), + delayInMinutes: delayInMinutesSchema, }); export const createRuleSchema = z.object({ diff --git a/apps/web/utils/ai/types.ts b/apps/web/utils/ai/types.ts index 220674d5f7..7857ca451b 100644 --- a/apps/web/utils/ai/types.ts +++ b/apps/web/utils/ai/types.ts @@ -22,4 +22,5 @@ export type ActionItem = { cc?: ExecutedAction["cc"]; bcc?: ExecutedAction["bcc"]; url?: ExecutedAction["url"]; + delayInMinutes?: number | null; }; diff --git a/apps/web/utils/date.ts b/apps/web/utils/date.ts index df4051c393..366a9edad9 100644 --- a/apps/web/utils/date.ts +++ b/apps/web/utils/date.ts @@ -7,6 +7,11 @@ export const ONE_DAY_MS = ONE_HOUR_MS * 24; export const ONE_MONTH_MS = ONE_DAY_MS * 30; export const ONE_YEAR_MS = ONE_DAY_MS * 365; +export const ONE_HOUR_MINUTES = 60; +export const ONE_DAY_MINUTES = ONE_HOUR_MINUTES * 24; +export const ONE_WEEK_MINUTES = ONE_DAY_MINUTES * 7; +export const NINETY_DAYS_MINUTES = ONE_DAY_MINUTES * 90; + /** * Formats a date into a short string. * - If the date is today, returns the time (e.g., "3:44 PM"). diff --git a/apps/web/utils/delayed-actions.ts b/apps/web/utils/delayed-actions.ts new file mode 100644 index 0000000000..a28389f7f6 --- /dev/null +++ b/apps/web/utils/delayed-actions.ts @@ -0,0 +1,17 @@ +import { ActionType } from "@prisma/client"; + +// Action types that support delayed execution +const SUPPORTED_DELAYED_ACTIONS: ActionType[] = [ + ActionType.ARCHIVE, + ActionType.LABEL, + ActionType.REPLY, + ActionType.SEND_EMAIL, + ActionType.FORWARD, + ActionType.DRAFT_EMAIL, + ActionType.CALL_WEBHOOK, + ActionType.MARK_READ, +]; + +export function canActionBeDelayed(actionType: ActionType): boolean { + return SUPPORTED_DELAYED_ACTIONS.includes(actionType); +} diff --git a/apps/web/utils/rule/rule.ts b/apps/web/utils/rule/rule.ts index a60aa9a784..944ae83e04 100644 --- a/apps/web/utils/rule/rule.ts +++ b/apps/web/utils/rule/rule.ts @@ -347,6 +347,7 @@ function mapActionFields( subject: a.fields?.subject, content: a.fields?.content, url: a.fields?.webhookUrl, + delayInMinutes: a.delayInMinutes, }), ); } diff --git a/apps/web/utils/scheduled-actions/executor.test.ts b/apps/web/utils/scheduled-actions/executor.test.ts new file mode 100644 index 0000000000..318e590048 --- /dev/null +++ b/apps/web/utils/scheduled-actions/executor.test.ts @@ -0,0 +1,279 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { ActionType, ScheduledActionStatus } from "@prisma/client"; +import { executeScheduledAction } from "./executor"; +import prisma from "@/utils/__mocks__/prisma"; + +// Run with: pnpm test utils/scheduled-actions/executor.test.ts + +vi.mock("server-only", () => ({})); +vi.mock("@/utils/prisma"); +vi.mock("@/utils/gmail/client", () => ({ + getGmailClientWithRefresh: vi.fn(), +})); +vi.mock("@/utils/user/get", () => ({ + getEmailAccountWithAiAndTokens: vi.fn(), +})); +vi.mock("@/utils/ai/choose-rule/execute", () => ({ + executeAct: vi.fn(), +})); +vi.mock("@/utils/ai/actions", () => ({ + runActionFunction: vi.fn(), +})); +vi.mock("@/utils/gmail/message", () => ({ + getMessage: vi.fn(), +})); +vi.mock("@/utils/mail", () => ({ + parseMessage: vi.fn(), +})); + +describe("executor", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + describe("executeScheduledAction", () => { + const mockScheduledAction = { + id: "scheduled-action-123", + executedRuleId: "rule-123", + actionType: ActionType.ARCHIVE, + messageId: "msg-123", + threadId: "thread-123", + emailAccountId: "account-123", + scheduledFor: new Date("2024-01-01T12:00:00Z"), + status: ScheduledActionStatus.PENDING, + label: null, + subject: null, + content: null, + to: null, + cc: null, + bcc: null, + url: null, + createdAt: new Date(), + updatedAt: new Date(), + executedAt: null, + executedActionId: null, + } as any; + + it("should successfully execute action and mark as completed", async () => { + prisma.scheduledAction.update.mockResolvedValue({ + ...mockScheduledAction, + status: ScheduledActionStatus.COMPLETED, + } as any); + prisma.executedAction.create.mockResolvedValue({ + id: "executed-action-123", + type: ActionType.ARCHIVE, + label: null, + createdAt: new Date(), + updatedAt: new Date(), + executedRuleId: "rule-123", + subject: null, + content: null, + to: null, + cc: null, + bcc: null, + url: null, + draftId: null, + wasDraftSent: null, + }); + prisma.executedRule.findUnique.mockResolvedValue({ + id: "rule-123", + createdAt: new Date(), + updatedAt: new Date(), + messageId: "msg-123", + threadId: "thread-123", + emailAccountId: "account-123", + status: "PENDING", + automated: true, + reason: null, + ruleId: null, + } as any); + prisma.scheduledAction.count.mockResolvedValue(0); + prisma.executedRule.update.mockResolvedValue({ + id: "rule-123", + createdAt: new Date(), + updatedAt: new Date(), + messageId: "msg-123", + threadId: "thread-123", + emailAccountId: "account-123", + status: "APPLIED", + automated: true, + reason: null, + ruleId: null, + }); + + const { runActionFunction } = await import("@/utils/ai/actions"); + const { getEmailAccountWithAiAndTokens } = await import( + "@/utils/user/get" + ); + const { getGmailClientWithRefresh } = await import( + "@/utils/gmail/client" + ); + const { getMessage } = await import("@/utils/gmail/message"); + const { parseMessage } = await import("@/utils/mail"); + + (runActionFunction as any).mockResolvedValue(undefined); + (parseMessage as any).mockReturnValue({ + id: "msg-123", + threadId: "thread-123", + headers: {}, + textPlain: "test content", + textHtml: "

test content

", + attachments: [], + internalDate: "1234567890", + snippet: "", + historyId: "", + inline: [], + }); + (getEmailAccountWithAiAndTokens as any).mockResolvedValue({ + id: "account-123", + userId: "user-123", + email: "test@example.com", + tokens: { + access_token: "token", + refresh_token: "refresh", + expires_at: Date.now() + 3600000, + }, + }); + (getMessage as any).mockResolvedValue({ + id: "msg-123", + threadId: "thread-123", + }); + (getGmailClientWithRefresh as any).mockResolvedValue({ + users: { + messages: { + get: vi.fn().mockResolvedValue({ + data: { id: "msg-123", threadId: "thread-123" }, + }), + }, + }, + }); + + const result = await executeScheduledAction(mockScheduledAction); + + expect(result.success).toBe(true); + expect(prisma.scheduledAction.update).toHaveBeenCalledWith({ + where: { id: "scheduled-action-123" }, + data: { + status: ScheduledActionStatus.COMPLETED, + executedAt: expect.any(Date), + executedActionId: "executed-action-123", + }, + }); + }); + + it("should handle execution errors and mark as failed", async () => { + prisma.scheduledAction.update.mockResolvedValue({ + ...mockScheduledAction, + status: ScheduledActionStatus.FAILED, + } as any); + prisma.executedAction.create.mockResolvedValue({ + id: "executed-action-123", + type: ActionType.ARCHIVE, + label: null, + createdAt: new Date(), + updatedAt: new Date(), + executedRuleId: "rule-123", + subject: null, + content: null, + to: null, + cc: null, + bcc: null, + url: null, + draftId: null, + wasDraftSent: null, + }); + prisma.executedRule.findUnique.mockResolvedValue({ + id: "rule-123", + createdAt: new Date(), + updatedAt: new Date(), + messageId: "msg-123", + threadId: "thread-123", + emailAccountId: "account-123", + status: "PENDING", + automated: true, + reason: null, + ruleId: null, + } as any); + + const { runActionFunction } = await import("@/utils/ai/actions"); + const { getEmailAccountWithAiAndTokens } = await import( + "@/utils/user/get" + ); + const { getGmailClientWithRefresh } = await import( + "@/utils/gmail/client" + ); + const { getMessage } = await import("@/utils/gmail/message"); + const { parseMessage } = await import("@/utils/mail"); + + (runActionFunction as any).mockRejectedValue( + new Error("Execution failed"), + ); + (parseMessage as any).mockReturnValue({ + id: "msg-123", + threadId: "thread-123", + headers: {}, + textPlain: "test content", + textHtml: "

test content

", + attachments: [], + internalDate: "1234567890", + snippet: "", + historyId: "", + inline: [], + }); + (getEmailAccountWithAiAndTokens as any).mockResolvedValue({ + id: "account-123", + userId: "user-123", + email: "test@example.com", + tokens: { + access_token: "token", + refresh_token: "refresh", + expires_at: Date.now() + 3600000, + }, + }); + (getMessage as any).mockResolvedValue({ + id: "msg-123", + threadId: "thread-123", + }); + (getGmailClientWithRefresh as any).mockResolvedValue({ + users: { + messages: { + get: vi.fn().mockResolvedValue({ + data: { id: "msg-123", threadId: "thread-123" }, + }), + }, + }, + }); + + const result = await executeScheduledAction(mockScheduledAction); + + expect(result.success).toBe(false); + expect(prisma.scheduledAction.update).toHaveBeenCalledWith({ + where: { id: "scheduled-action-123" }, + data: { + status: ScheduledActionStatus.FAILED, + }, + }); + }); + + it("should handle account not found errors", async () => { + prisma.scheduledAction.update.mockResolvedValue({ + ...mockScheduledAction, + status: ScheduledActionStatus.EXECUTING, + } as any); + + const { getEmailAccountWithAiAndTokens } = await import( + "@/utils/user/get" + ); + (getEmailAccountWithAiAndTokens as any).mockResolvedValue(null); + + await executeScheduledAction(mockScheduledAction); + + expect(prisma.scheduledAction.update).toHaveBeenCalledWith({ + where: { id: "scheduled-action-123" }, + data: { + status: ScheduledActionStatus.FAILED, + }, + }); + }); + }); +}); diff --git a/apps/web/utils/scheduled-actions/executor.ts b/apps/web/utils/scheduled-actions/executor.ts new file mode 100644 index 0000000000..f3bb5557cd --- /dev/null +++ b/apps/web/utils/scheduled-actions/executor.ts @@ -0,0 +1,306 @@ +import type { gmail_v1 } from "@googleapis/gmail"; +import { + ActionType, + ExecutedRuleStatus, + ScheduledActionStatus, + type ScheduledAction, +} from "@prisma/client"; +import prisma from "@/utils/prisma"; +import { createScopedLogger } from "@/utils/logger"; +import { getEmailAccountWithAiAndTokens } from "@/utils/user/get"; +import { getGmailClientWithRefresh } from "@/utils/gmail/client"; +import { runActionFunction } from "@/utils/ai/actions"; +import type { ActionItem, EmailForAction } from "@/utils/ai/types"; +import type { ParsedMessage } from "@/utils/types"; +import { getMessage } from "@/utils/gmail/message"; +import { parseMessage } from "@/utils/mail"; + +const logger = createScopedLogger("scheduled-actions-executor"); + +/** + * Execute a scheduled action + */ +export async function executeScheduledAction(scheduledAction: ScheduledAction) { + logger.info("Executing scheduled action", { + scheduledActionId: scheduledAction.id, + actionType: scheduledAction.actionType, + messageId: scheduledAction.messageId, + emailAccountId: scheduledAction.emailAccountId, + }); + + try { + // Get email account with tokens + const emailAccount = await getEmailAccountWithAiAndTokens({ + emailAccountId: scheduledAction.emailAccountId, + }); + if (!emailAccount) { + throw new Error("Email account not found"); + } + + // Get Gmail client + const gmail = await getGmailClientWithRefresh({ + accessToken: emailAccount.tokens.access_token, + refreshToken: emailAccount.tokens.refresh_token, + expiresAt: emailAccount.tokens.expires_at, + emailAccountId: emailAccount.id, + }); + + // Validate email still exists and get current state + const emailMessage = await validateEmailState(gmail, scheduledAction); + if (!emailMessage) { + await markActionCompleted( + scheduledAction.id, + null, + "Email no longer exists", + ); + return { success: true, reason: "Email no longer exists" }; + } + + // Create ActionItem from scheduled action data + const actionItem: ActionItem = { + id: scheduledAction.id, // Use scheduled action ID temporarily + type: scheduledAction.actionType, + label: scheduledAction.label, + subject: scheduledAction.subject, + content: scheduledAction.content, + to: scheduledAction.to, + cc: scheduledAction.cc, + bcc: scheduledAction.bcc, + url: scheduledAction.url, + }; + + // Execute the action + const executedAction = await executeDelayedAction({ + gmail, + actionItem, + emailMessage, + emailAccount: { + email: emailAccount.email, + userId: emailAccount.userId, + id: emailAccount.id, + }, + scheduledAction, + }); + + await markActionCompleted(scheduledAction.id, executedAction?.id); + + // Check if all scheduled actions for this ExecutedRule are complete + await checkAndCompleteExecutedRule(scheduledAction.executedRuleId); + + logger.info("Successfully executed scheduled action", { + scheduledActionId: scheduledAction.id, + executedActionId: executedAction?.id, + }); + + return { success: true, executedActionId: executedAction?.id }; + } catch (error: unknown) { + logger.error("Failed to execute scheduled action", { + scheduledActionId: scheduledAction.id, + error, + }); + + await markActionFailed(scheduledAction.id, error); + return { success: false, error }; + } +} + +/** + * Validate that the email still exists and return current state + */ +async function validateEmailState( + gmail: gmail_v1.Gmail, + scheduledAction: ScheduledAction, +): Promise { + try { + const message = await getMessage(scheduledAction.messageId, gmail, "full"); + + if (!message) { + logger.info("Email no longer exists", { + messageId: scheduledAction.messageId, + scheduledActionId: scheduledAction.id, + }); + return null; + } + + // Parse the message to get the correct format + const parsedMessage = parseMessage(message); + + // Convert to EmailForAction format + const emailForAction: EmailForAction = { + threadId: parsedMessage.threadId, + id: parsedMessage.id, + headers: parsedMessage.headers, + textPlain: parsedMessage.textPlain || "", + textHtml: parsedMessage.textHtml || "", + attachments: parsedMessage.attachments || [], + internalDate: parsedMessage.internalDate, + }; + + return emailForAction; + } catch (error: unknown) { + // Check for Gmail's standard "not found" error message + if ( + error instanceof Error && + error.message === "Requested entity was not found." + ) { + logger.info("Email not found during validation", { + messageId: scheduledAction.messageId, + scheduledActionId: scheduledAction.id, + }); + return null; + } + + throw error; + } +} + +/** + * Execute the delayed action using existing action execution logic + */ +async function executeDelayedAction({ + gmail, + actionItem, + emailMessage, + emailAccount, + scheduledAction, +}: { + gmail: gmail_v1.Gmail; + actionItem: ActionItem; + emailMessage: EmailForAction; + emailAccount: { email: string; userId: string; id: string }; + scheduledAction: ScheduledAction; +}) { + // Create ExecutedAction record first to maintain audit trail + const executedAction = await prisma.executedAction.create({ + data: { + type: actionItem.type, + label: actionItem.label, + subject: actionItem.subject, + content: actionItem.content, + to: actionItem.to, + cc: actionItem.cc, + bcc: actionItem.bcc, + url: actionItem.url, + executedRule: { + connect: { id: scheduledAction.executedRuleId }, + }, + }, + }); + + // Get the complete ExecutedRule for context + const executedRule = await prisma.executedRule.findUnique({ + where: { id: scheduledAction.executedRuleId }, + include: { actionItems: true }, + }); + + if (!executedRule) { + throw new Error(`ExecutedRule ${scheduledAction.executedRuleId} not found`); + } + + // Create a ParsedMessage from EmailForAction + const parsedMessage: ParsedMessage = { + id: emailMessage.id, + threadId: emailMessage.threadId, + headers: emailMessage.headers, + textPlain: emailMessage.textPlain, + textHtml: emailMessage.textHtml, + attachments: emailMessage.attachments, + internalDate: emailMessage.internalDate, + // Required ParsedMessage fields that aren't used in action execution + snippet: "", + historyId: "", + inline: [], + }; + + logger.info("Executing delayed action", { + actionType: executedAction.type, + executedActionId: executedAction.id, + messageId: parsedMessage.id, + }); + + // Execute the specific action directly using runActionFunction + await runActionFunction({ + gmail, + email: parsedMessage, + action: executedAction, + userEmail: emailAccount.email, + userId: emailAccount.userId, + emailAccountId: emailAccount.id, + executedRule, + }); + + logger.info("Successfully executed delayed action", { + actionType: executedAction.type, + executedActionId: executedAction.id, + }); + + return executedAction; +} + +/** + * Mark scheduled action as completed + */ +async function markActionCompleted( + scheduledActionId: string, + executedActionId: string | null | undefined, + reason?: string, +) { + await prisma.scheduledAction.update({ + where: { id: scheduledActionId }, + data: { + status: ScheduledActionStatus.COMPLETED, + executedAt: new Date(), + executedActionId: executedActionId || undefined, + }, + }); + + logger.info("Marked scheduled action as completed", { + scheduledActionId, + executedActionId, + reason, + }); +} + +/** + * Mark scheduled action as failed + */ +async function markActionFailed(scheduledActionId: string, error: unknown) { + await prisma.scheduledAction.update({ + where: { id: scheduledActionId }, + data: { + status: ScheduledActionStatus.FAILED, + }, + }); + + logger.warn("Marked scheduled action as failed", { + scheduledActionId, + error, + }); +} + +/** + * Check if all scheduled actions for an ExecutedRule are complete + * and update the ExecutedRule status accordingly + */ +async function checkAndCompleteExecutedRule(executedRuleId: string) { + const pendingActions = await prisma.scheduledAction.count({ + where: { + executedRuleId, + status: { + in: [ScheduledActionStatus.PENDING, ScheduledActionStatus.EXECUTING], + }, + }, + }); + + if (pendingActions === 0) { + // All scheduled actions are complete, update ExecutedRule status + await prisma.executedRule.update({ + where: { id: executedRuleId }, + data: { status: ExecutedRuleStatus.APPLIED }, + }); + + logger.info("Completed ExecutedRule - all scheduled actions finished", { + executedRuleId, + }); + } +} diff --git a/apps/web/utils/scheduled-actions/scheduler.test.ts b/apps/web/utils/scheduled-actions/scheduler.test.ts new file mode 100644 index 0000000000..eee11ee725 --- /dev/null +++ b/apps/web/utils/scheduled-actions/scheduler.test.ts @@ -0,0 +1,134 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { ActionType, ScheduledActionStatus } from "@prisma/client"; +import { cancelScheduledActions } from "./scheduler"; +import { canActionBeDelayed } from "@/utils/delayed-actions"; +import prisma from "@/utils/__mocks__/prisma"; + +vi.mock("server-only", () => ({})); +vi.mock("@/utils/prisma"); +vi.mock("@/utils/upstash", () => ({ + qstash: { + messages: { + delete: vi.fn().mockResolvedValue({}), + }, + }, +})); + +describe("scheduler", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + describe("canActionBeDelayed", () => { + it("should return true for supported actions", () => { + expect(canActionBeDelayed(ActionType.ARCHIVE)).toBe(true); + expect(canActionBeDelayed(ActionType.LABEL)).toBe(true); + expect(canActionBeDelayed(ActionType.MARK_READ)).toBe(true); + expect(canActionBeDelayed(ActionType.REPLY)).toBe(true); + expect(canActionBeDelayed(ActionType.SEND_EMAIL)).toBe(true); + expect(canActionBeDelayed(ActionType.FORWARD)).toBe(true); + expect(canActionBeDelayed(ActionType.DRAFT_EMAIL)).toBe(true); + expect(canActionBeDelayed(ActionType.CALL_WEBHOOK)).toBe(true); + }); + + it("should return false for unsupported actions", () => { + expect(canActionBeDelayed(ActionType.MARK_SPAM)).toBe(false); + expect(canActionBeDelayed(ActionType.TRACK_THREAD)).toBe(false); + expect(canActionBeDelayed(ActionType.DIGEST)).toBe(false); + }); + }); + + describe("cancelScheduledActions", () => { + it("should cancel scheduled actions for a message", async () => { + // Mock finding actions to cancel + prisma.scheduledAction.findMany.mockResolvedValue([ + { id: "action-1", scheduledId: "qstash-msg-1" }, + { id: "action-2", scheduledId: "qstash-msg-2" }, + ] as any); + + // Mock updating actions as cancelled + prisma.scheduledAction.updateMany.mockResolvedValue({ count: 2 }); + + const result = await cancelScheduledActions({ + messageId: "msg-123", + emailAccountId: "account-123", + }); + + expect(prisma.scheduledAction.findMany).toHaveBeenCalledWith({ + where: { + emailAccountId: "account-123", + messageId: "msg-123", + status: ScheduledActionStatus.PENDING, + }, + select: { + id: true, + scheduledId: true, + }, + }); + + expect(prisma.scheduledAction.updateMany).toHaveBeenCalledWith({ + where: { + emailAccountId: "account-123", + messageId: "msg-123", + status: ScheduledActionStatus.PENDING, + }, + data: { + status: ScheduledActionStatus.CANCELLED, + }, + }); + + expect(result).toBe(2); + }); + + it("should return zero when no actions to cancel", async () => { + prisma.scheduledAction.findMany.mockResolvedValue([]); + prisma.scheduledAction.updateMany.mockResolvedValue({ count: 0 }); + + const result = await cancelScheduledActions({ + messageId: "msg-456", + emailAccountId: "account-123", + }); + + expect(result).toBe(0); + }); + + it("should include threadId when provided", async () => { + prisma.scheduledAction.findMany.mockResolvedValue([ + { id: "action-1", scheduledId: "qstash-msg-1" }, + ] as any); + prisma.scheduledAction.updateMany.mockResolvedValue({ count: 1 }); + + await cancelScheduledActions({ + messageId: "msg-123", + emailAccountId: "account-123", + threadId: "thread-123", + reason: "Custom reason", + }); + + expect(prisma.scheduledAction.findMany).toHaveBeenCalledWith({ + where: { + emailAccountId: "account-123", + messageId: "msg-123", + threadId: "thread-123", + status: ScheduledActionStatus.PENDING, + }, + select: { + id: true, + scheduledId: true, + }, + }); + + expect(prisma.scheduledAction.updateMany).toHaveBeenCalledWith({ + where: { + emailAccountId: "account-123", + messageId: "msg-123", + threadId: "thread-123", + status: ScheduledActionStatus.PENDING, + }, + data: { + status: ScheduledActionStatus.CANCELLED, + }, + }); + }); + }); +}); diff --git a/apps/web/utils/scheduled-actions/scheduler.ts b/apps/web/utils/scheduled-actions/scheduler.ts new file mode 100644 index 0000000000..6fdac0edd0 --- /dev/null +++ b/apps/web/utils/scheduled-actions/scheduler.ts @@ -0,0 +1,360 @@ +import { ScheduledActionStatus } from "@prisma/client"; +import prisma from "@/utils/prisma"; +import type { ActionItem } from "@/utils/ai/types"; +import { createScopedLogger } from "@/utils/logger"; +import { canActionBeDelayed } from "@/utils/delayed-actions"; +import { env } from "@/env"; +import { getCronSecretHeader } from "@/utils/cron"; +import { Client } from "@upstash/qstash"; +import { addMinutes, getUnixTime } from "date-fns"; + +const logger = createScopedLogger("qstash-scheduled-actions"); + +interface ScheduledActionPayload { + scheduledActionId: string; +} + +function getQstashClient() { + if (!env.QSTASH_TOKEN) return null; + return new Client({ token: env.QSTASH_TOKEN }); +} + +export async function createScheduledAction({ + executedRuleId, + actionItem, + messageId, + threadId, + emailAccountId, + scheduledFor, +}: { + executedRuleId: string; + actionItem: ActionItem; + messageId: string; + threadId: string; + emailAccountId: string; + scheduledFor: Date; +}) { + if (!canActionBeDelayed(actionItem.type)) { + throw new Error( + `Action type ${actionItem.type} is not supported for delayed execution`, + ); + } + + if (actionItem.delayInMinutes == null || actionItem.delayInMinutes <= 0) { + throw new Error( + `Invalid delayInMinutes: ${actionItem.delayInMinutes}. Must be a positive number.`, + ); + } + + try { + const scheduledAction = await prisma.scheduledAction.create({ + data: { + executedRuleId, + actionType: actionItem.type, + messageId, + threadId, + emailAccountId, + scheduledFor, + status: ScheduledActionStatus.PENDING, + // Store ActionItem data for later execution + label: actionItem.label, + subject: actionItem.subject, + content: actionItem.content, + to: actionItem.to, + cc: actionItem.cc, + bcc: actionItem.bcc, + url: actionItem.url, + }, + }); + + const payload: ScheduledActionPayload = { + scheduledActionId: scheduledAction.id, + }; + + const deduplicationId = `scheduled-action-${scheduledAction.id}`; + + const scheduledId = await scheduleMessage({ + payload, + delayInMinutes: actionItem.delayInMinutes, + deduplicationId, + }); + + if (scheduledId) { + await prisma.scheduledAction.update({ + where: { id: scheduledAction.id }, + data: { + scheduledId, + schedulingStatus: "SCHEDULED" as const, + }, + }); + } + + logger.info("Created and scheduled action with QStash", { + scheduledActionId: scheduledAction.id, + actionType: actionItem.type, + scheduledFor, + messageId, + threadId, + deduplicationId, + }); + + return scheduledAction; + } catch (error) { + logger.error("Failed to create QStash scheduled action", { + error, + executedRuleId, + actionType: actionItem.type, + messageId, + threadId, + }); + throw error; + } +} + +export async function scheduleDelayedActions({ + executedRuleId, + actionItems, + messageId, + threadId, + emailAccountId, +}: { + executedRuleId: string; + actionItems: ActionItem[]; + messageId: string; + threadId: string; + emailAccountId: string; +}) { + const delayedActions = actionItems.filter( + (item) => + item.delayInMinutes != null && + item.delayInMinutes > 0 && + canActionBeDelayed(item.type), + ); + + if (delayedActions.length === 0) { + return []; + } + + const scheduledActions = []; + + for (const actionItem of delayedActions) { + const scheduledFor = addMinutes(new Date(), actionItem.delayInMinutes!); + + const scheduledAction = await createScheduledAction({ + executedRuleId, + actionItem, + messageId, + threadId, + emailAccountId, + scheduledFor, + }); + + scheduledActions.push(scheduledAction); + } + + logger.info("Scheduled delayed actions with QStash", { + count: scheduledActions.length, + executedRuleId, + messageId, + threadId, + }); + + return scheduledActions; +} + +export async function cancelScheduledActions({ + emailAccountId, + messageId, + threadId, + reason = "Superseded by new rule", +}: { + emailAccountId: string; + messageId: string; + threadId?: string; + reason?: string; +}) { + try { + // First, get the scheduled actions that will be cancelled + const actionsToCancel = await prisma.scheduledAction.findMany({ + where: { + emailAccountId, + messageId, + ...(threadId && { threadId }), + status: ScheduledActionStatus.PENDING, + }, + select: { id: true, scheduledId: true }, + }); + + if (actionsToCancel.length === 0) { + return 0; + } + + // Cancel the QStash messages first for efficiency + const client = getQstashClient(); + if (client) { + for (const action of actionsToCancel) { + if (action.scheduledId) { + try { + await cancelMessage(client, action.scheduledId); + logger.info("Cancelled QStash message", { + scheduledActionId: action.id, + scheduledId: action.scheduledId, + }); + } catch (error) { + // Log but don't fail the entire operation if QStash cancellation fails + logger.warn("Failed to cancel QStash message", { + scheduledActionId: action.id, + scheduledId: action.scheduledId, + error, + }); + } + } + } + } + + const cancelledActions = await prisma.scheduledAction.updateMany({ + where: { + emailAccountId, + messageId, + ...(threadId && { threadId }), + status: ScheduledActionStatus.PENDING, + }, + data: { + status: ScheduledActionStatus.CANCELLED, + }, + }); + + logger.info("Cancelled QStash scheduled actions", { + count: cancelledActions.count, + emailAccountId, + messageId, + threadId, + reason, + }); + + return cancelledActions.count; + } catch (error) { + logger.error("Failed to cancel QStash scheduled actions", { + error, + emailAccountId, + messageId, + threadId, + }); + throw error; + } +} + +async function scheduleMessage({ + payload, + delayInMinutes, + deduplicationId, +}: { + payload: ScheduledActionPayload; + delayInMinutes: number; + deduplicationId: string; +}) { + const client = getQstashClient(); + const url = `${env.WEBHOOK_URL || env.NEXT_PUBLIC_BASE_URL}/api/scheduled-actions/execute`; + + const notBefore = getUnixTime(addMinutes(new Date(), delayInMinutes)); + + try { + if (client) { + const response = await client.publishJSON({ + url, + body: payload, + notBefore, // Absolute delay using unix timestamp + deduplicationId, + contentBasedDeduplication: false, + headers: getCronSecretHeader(), + }); + + // The messageId here has a different meaning because it is + // the QStash identifier and not the usual messageId of the email + const messageId = + "messageId" in response ? response.messageId : undefined; + + logger.info("Successfully scheduled with QStash", { + scheduledActionId: payload.scheduledActionId, + scheduledId: messageId, + notBefore, + delayInMinutes, + deduplicationId, + }); + + return messageId; + } else { + logger.error( + "QStash client not available, scheduled action cannot be executed", + { + scheduledActionId: payload.scheduledActionId, + }, + ); + + await prisma.scheduledAction.update({ + where: { id: payload.scheduledActionId }, + data: { + schedulingStatus: "FAILED" as const, + }, + }); + + throw new Error( + "QStash client not available - scheduled action cannot be executed", + ); + } + } catch (error) { + logger.error("Failed to schedule with QStash", { + error, + scheduledActionId: payload.scheduledActionId, + deduplicationId, + }); + + await prisma.scheduledAction.update({ + where: { id: payload.scheduledActionId }, + data: { + schedulingStatus: "FAILED" as const, + }, + }); + + throw error; + } +} + +async function cancelMessage( + client: InstanceType, + messageId: string, +) { + try { + await client.http.request({ + path: ["v2", "messages", messageId], + method: "DELETE", + }); + logger.info("Successfully cancelled QStash message", { messageId }); + } catch (error) { + logger.error("Failed to cancel QStash message", { messageId, error }); + throw error; + } +} + +export async function markQStashActionAsExecuting(scheduledActionId: string) { + try { + const updatedAction = await prisma.scheduledAction.update({ + where: { + id: scheduledActionId, + status: ScheduledActionStatus.PENDING, + }, + data: { + status: ScheduledActionStatus.EXECUTING, + }, + }); + + return updatedAction; + } catch (error) { + // If update fails, the action might already be executing, completed, or cancelled + logger.warn("Failed to mark QStash action as executing", { + scheduledActionId, + error, + }); + return null; + } +}