diff --git a/apps/fastmail-eventsource-daemon/.env.example b/apps/fastmail-eventsource-daemon/.env.example new file mode 100644 index 0000000000..aaf07b3fb6 --- /dev/null +++ b/apps/fastmail-eventsource-daemon/.env.example @@ -0,0 +1,25 @@ +# Fastmail EventSource Daemon Configuration +# This daemon connects to Fastmail's EventSource API for real-time email notifications +# and forwards state changes to the main Inbox Zero application webhook. + +# Required: Database URL (same as main app) +DATABASE_URL=postgresql://user:password@localhost:5432/inbox_zero + +# Required: Main application URL where the webhook endpoint is hosted +MAIN_APP_URL=http://localhost:3000 + +# Required: Shared secret for authenticating with the webhook endpoint +# Must match FASTMAIL_WEBHOOK_SECRET in your main app +# Generate with: openssl rand -hex 32 +FASTMAIL_WEBHOOK_SECRET=your-secret-here + +# Required for token refresh (when using OAuth, not app tokens) +FASTMAIL_CLIENT_ID=your-fastmail-client-id +FASTMAIL_CLIENT_SECRET=your-fastmail-client-secret + +# Optional: How often to refresh the account list (in milliseconds) +# Default: 300000 (5 minutes) +ACCOUNT_REFRESH_INTERVAL=300000 + +# Optional: Enable debug logging +DEBUG=false diff --git a/apps/fastmail-eventsource-daemon/Dockerfile b/apps/fastmail-eventsource-daemon/Dockerfile new file mode 100644 index 0000000000..2b7a1e47ef --- /dev/null +++ b/apps/fastmail-eventsource-daemon/Dockerfile @@ -0,0 +1,25 @@ +# Fastmail EventSource Daemon +# This daemon connects to Fastmail's EventSource API for real-time email notifications + +FROM node:20-alpine + +# Install pnpm +RUN corepack enable && corepack prepare pnpm@latest --activate + +WORKDIR /app + +# Copy package files and prisma schema +COPY package.json pnpm-lock.yaml* ./ +COPY prisma ./prisma/ + +# Install dependencies (this also runs prisma generate via postinstall) +RUN pnpm install --frozen-lockfile + +# Copy source files +COPY . . + +# Build TypeScript +RUN pnpm build + +# Run the daemon +CMD ["pnpm", "start"] diff --git a/apps/fastmail-eventsource-daemon/README.md b/apps/fastmail-eventsource-daemon/README.md new file mode 100644 index 0000000000..e2860fcb08 --- /dev/null +++ b/apps/fastmail-eventsource-daemon/README.md @@ -0,0 +1,171 @@ +# Fastmail EventSource Daemon + +A lightweight daemon that connects to Fastmail's JMAP EventSource API for real-time email notifications and forwards state changes to the main Inbox Zero application. + +## Overview + +This daemon implements [JMAP EventSource](https://www.rfc-editor.org/rfc/rfc8620.html#section-7.3) (RFC 8620 section 7.3) to receive real-time push notifications from Fastmail when emails arrive or change state. + +### Architecture + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ Self-Hosted Deployment │ +├─────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌─────────────────────────────────────────────────────────────┐ │ +│ │ EventSource Daemon │ │ +│ │ (this service) │ │ +│ │ │ │ +│ │ For each Fastmail account: │ │ +│ │ 1. Connect to Fastmail EventSource │ │ +│ │ 2. On state change → POST to webhook │ │ +│ └──────────────────────────┬──────────────────────────────────┘ │ +│ │ │ +│ │ POST /api/fastmail/webhook │ +│ │ { emailAccountId, newState } │ +│ ▼ │ +│ ┌─────────────────────────────────────────────────────────────┐ │ +│ │ Next.js Application │ │ +│ │ │ │ +│ │ /api/fastmail/webhook ─────► pollFastmailAccount() │ │ +│ └─────────────────────────────────────────────────────────────┘ │ +└─────────────────────────────────────────────────────────────────┘ +``` + +## Requirements + +- Node.js 20+ +- Access to the same PostgreSQL database as the main Inbox Zero app +- Network access to the main Inbox Zero app webhook endpoint + +## Configuration + +Copy `.env.example` to `.env` and configure: + +```bash +# Required: Same database as main app +DATABASE_URL=postgresql://user:password@localhost:5432/inbox_zero + +# Required: Main app URL +MAIN_APP_URL=http://localhost:3000 + +# Required: Shared secret (must match FASTMAIL_WEBHOOK_SECRET in main app) +# Generate with: openssl rand -hex 32 +FASTMAIL_WEBHOOK_SECRET=your-secret-here + +# Required for OAuth token refresh +FASTMAIL_CLIENT_ID=your-fastmail-client-id +FASTMAIL_CLIENT_SECRET=your-fastmail-client-secret + +# Optional: Refresh interval (default: 5 minutes) +ACCOUNT_REFRESH_INTERVAL=300000 + +# Optional: Enable debug logging +DEBUG=false +``` + +## Running + +### Development + +```bash +pnpm install +pnpm dev +``` + +### Production + +```bash +pnpm install +pnpm start +``` + +### Docker + +```bash +docker build -t inbox-zero-fastmail-daemon . +docker run -e DATABASE_URL=... -e MAIN_APP_URL=... -e FASTMAIL_WEBHOOK_SECRET=... inbox-zero-fastmail-daemon +``` + +### Docker Compose + +Add to your `docker-compose.yml`: + +```yaml +services: + fastmail-daemon: + build: ./apps/fastmail-eventsource-daemon + environment: + - DATABASE_URL=${DATABASE_URL} + - MAIN_APP_URL=http://web:3000 + - FASTMAIL_WEBHOOK_SECRET=${FASTMAIL_WEBHOOK_SECRET} + - FASTMAIL_CLIENT_ID=${FASTMAIL_CLIENT_ID} + - FASTMAIL_CLIENT_SECRET=${FASTMAIL_CLIENT_SECRET} + depends_on: + - web + - postgres + restart: unless-stopped +``` + +## How It Works + +1. **Startup**: The daemon queries the database for all Fastmail accounts with: + - Valid access token + - At least one enabled automation rule + - Premium tier with AI access (or self-hosted bypass) + +2. **Per Account**: For each account, it: + - Fetches the JMAP session to get the EventSource URL + - Opens a Server-Sent Events (SSE) connection to Fastmail + - Subscribes to Email state changes + +3. **On State Change**: When Fastmail sends a state change event: + - The daemon POSTs to `/api/fastmail/webhook` on the main app + - The main app triggers `pollFastmailAccount()` with `forceSync: true` + - Emails are processed through the rule engine + +4. **Periodic Refresh**: Every 5 minutes (configurable), the daemon: + - Re-queries the database for accounts + - Starts connections for new accounts + - Closes connections for removed/ineligible accounts + +## Error Handling + +- **Connection errors**: Exponential backoff reconnection (1s, 2s, 4s... up to 5 min) +- **Token expiration (401)**: Automatic token refresh using OAuth refresh token +- **Webhook failures**: Retry with exponential backoff +- **Database errors**: Logged and continued (existing connections maintained) + +## Future: PushSubscription + +When Fastmail adds support for JMAP PushSubscription (RFC 8620 section 7.2), +this daemon will become unnecessary. Fastmail will be able to POST directly +to the `/api/fastmail/webhook` endpoint, which is already designed to handle this. + +The webhook endpoint in the main app is compatible with both: +1. This EventSource daemon (current) +2. Future Fastmail PushSubscription webhooks + +## Logs + +The daemon logs to stdout/stderr. In production, capture logs with your container orchestrator or process manager. + +Example output: +``` +╔═══════════════════════════════════════════════════════════════╗ +║ Fastmail EventSource Daemon ║ +║ ║ +║ Connects to Fastmail for real-time email notifications ║ +║ and forwards state changes to the main app webhook. ║ +╚═══════════════════════════════════════════════════════════════╝ + +Main App URL: http://localhost:3000 +Debug Mode: disabled + +[2024-01-15T10:30:00.000Z] [AccountManager] Starting account manager +[2024-01-15T10:30:00.100Z] [AccountManager] Found 3 eligible Fastmail accounts +[2024-01-15T10:30:00.200Z] [AccountManager] Adding connection for user@example.com +[2024-01-15T10:30:00.500Z] [AccountManager] Connected: abc123 +[Stats] Connections: 3/3 active +``` diff --git a/apps/fastmail-eventsource-daemon/package.json b/apps/fastmail-eventsource-daemon/package.json new file mode 100644 index 0000000000..cf9fc45ae4 --- /dev/null +++ b/apps/fastmail-eventsource-daemon/package.json @@ -0,0 +1,27 @@ +{ + "name": "inbox-zero-fastmail-eventsource-daemon", + "version": "1.0.0", + "private": true, + "type": "module", + "main": "index.js", + "scripts": { + "start": "tsx --tsconfig tsconfig.json src/index.ts", + "dev": "tsx watch --tsconfig tsconfig.json src/index.ts", + "build": "prisma generate && tsc", + "db:generate": "prisma generate", + "test": "echo \"Error: no test specified\" && exit 1" + }, + "devDependencies": { + "@types/node": "24.10.1", + "prisma": "6.8.2", + "tsx": "4.21.0", + "typescript": "5.9.3" + }, + "dependencies": { + "@prisma/client": "6.8.2", + "@t3-oss/env-core": "0.13.8", + "dotenv": "17.2.3", + "eventsource": "3.0.6", + "zod": "3.25.46" + } +} diff --git a/apps/fastmail-eventsource-daemon/prisma/schema.prisma b/apps/fastmail-eventsource-daemon/prisma/schema.prisma new file mode 100644 index 0000000000..16c7a0ba3f --- /dev/null +++ b/apps/fastmail-eventsource-daemon/prisma/schema.prisma @@ -0,0 +1,1188 @@ +datasource db { + provider = "postgresql" + url = env("DATABASE_URL") + directUrl = env("DIRECT_URL") +} + +generator client { + provider = "prisma-client-js" + output = "../generated/prisma" + previewFeatures = ["driverAdapters"] +} + +// Account, User, Session, and VerificationToken based on: https://authjs.dev/reference/adapter/prisma +model Account { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + userId String + provider String + type String @default("oidc") // next-auth deprecated field + providerAccountId String + refresh_token String? @db.Text + refreshTokenExpiresAt DateTime? + access_token String? @db.Text + expires_at DateTime? @default(now()) + token_type String? + scope String? + id_token String? @db.Text + session_state String? + user User @relation(fields: [userId], references: [id], onDelete: Cascade) + emailAccount EmailAccount? + + @@unique([provider, providerAccountId]) + @@index([userId]) +} + +// not in use. we only use jwt for sessions +model Session { + id String @id @default(cuid()) + sessionToken String @unique + userId String + expires DateTime + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + ipAddress String? + userAgent String? + user User @relation(fields: [userId], references: [id], onDelete: Cascade) + + activeOrganizationId String? + activeOrganization Organization? @relation(fields: [activeOrganizationId], references: [id], onDelete: SetNull) + + @@index([userId]) + @@index([activeOrganizationId]) +} + +model User { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + name String? + email String @unique + emailVerified Boolean? @default(false) + image String? + accounts Account[] + sessions Session[] + + // additional fields + completedOnboardingAt DateTime? // questions about the user. e.g. their role + completedAppOnboardingAt DateTime? // how to use the app + onboardingAnswers Json? + lastLogin DateTime? + utms Json? + errorMessages Json? // eg. user set incorrect AI API key + + // survey answers (extracted from onboardingAnswers for easier querying) + surveyFeatures String[] // multiple choice: features user is interested in + surveyRole String? // single choice: user's role. Now using `EmailAccount.role` instead + surveyGoal String? // single choice: what user wants to achieve + surveyCompanySize Int? // numeric company size: 1 (solo), 5 (2-10), 50 (11-100), 500 (101-1000), 1000 (1000+) + surveySource String? // single choice: how user heard about Inbox Zero + surveyImprovements String? // open text: what user wants to improve + + // settings + aiProvider String? + aiModel String? + aiApiKey String? + webhookSecret String? + + // referral system + referralCode String? @unique // User's own referral code + + // premium can be shared among multiple users + premiumId String? + premium Premium? @relation(name: "userPremium", fields: [premiumId], references: [id]) + // only admin users can manage premium + premiumAdminId String? + premiumAdmin Premium? @relation(fields: [premiumAdminId], references: [id]) + + apiKeys ApiKey[] + + emailAccounts EmailAccount[] + + // Referral relationships + referralsMade Referral[] @relation("ReferrerUser") + referralReceived Referral? @relation("ReferredUser") +} + +// Migrating over to the new settings model. Currently most settings are in the User model, but will be moved to this model in the future. +model EmailAccount { + id String @id @default(cuid()) + email String @unique + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + name String? // Name associated with the Google account + image String? // Profile image URL from the Google account + + about String? + writingStyle String? + signature String? // User's email signature from provider or manually set + includeReferralSignature Boolean @default(false) + watchEmailsExpirationDate DateTime? + watchEmailsSubscriptionId String? // For Outlook subscription ID + watchEmailsSubscriptionHistory Json? // Historical Outlook subscription IDs: [{ subscriptionId, createdAt, replacedAt }] + lastSyncedHistoryId String? + lastPolledAt DateTime? // For Fastmail polling - tracks last successful poll + behaviorProfile Json? + personaAnalysis Json? // ai analysis of the user's persona + role String? // the role confirmed by the user - previously `User.surveyRole` + timezone String? // User's timezone (IANA tz database format, e.g., "America/Los_Angeles", "Asia/Jerusalem") - handles DST automatically + calendarBookingLink String? // User's calendar booking link + + statsEmailFrequency Frequency @default(WEEKLY) + summaryEmailFrequency Frequency @default(WEEKLY) + lastSummaryEmailAt DateTime? + coldEmailBlocker ColdEmailSetting? // @deprecated + coldEmailDigest Boolean @default(false) // @deprecated + coldEmailPrompt String? // @deprecated + rulesPrompt String? // @deprecated + autoCategorizeSenders Boolean @default(false) + multiRuleSelectionEnabled Boolean @default(false) + + meetingBriefingsEnabled Boolean @default(false) + meetingBriefingsMinutesBefore Int @default(240) // 4 hours in minutes + + digestSchedule Schedule? + + userId String + user User @relation(fields: [userId], references: [id], onDelete: Cascade) + accountId String @unique + account Account @relation(fields: [accountId], references: [id], onDelete: Cascade) + + labels Label[] + rules Rule[] + executedRules ExecutedRule[] + newsletters Newsletter[] + coldEmails ColdEmail[] + groups Group[] + categories Category[] + threadTrackers ThreadTracker[] + cleanupJobs CleanupJob[] + cleanupThreads CleanupThread[] + emailMessages EmailMessage[] + emailTokens EmailToken[] + knowledge Knowledge[] + chats Chat[] + digests Digest[] + scheduledActions ScheduledAction[] + responseTimes ResponseTime[] + + members Member[] + invitations Invitation[] + ssoproviders SsoProvider[] + calendarConnections CalendarConnection[] + mcpConnections McpConnection[] + meetingBriefings MeetingBriefing[] + + @@index([userId]) + @@index([lastSummaryEmailAt]) +} + +model Organization { + id String @id @default(cuid()) + name String + slug String @unique + logo String? + metadata Json? + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + members Member[] + SsoProvider SsoProvider[] + sessions Session[] + invitations Invitation[] +} + +model Member { + id String @id @default(cuid()) + organizationId String + emailAccountId String + role String @default("member") // "admin" | "member" + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + organization Organization @relation(fields: [organizationId], references: [id], onDelete: Cascade) + emailAccount EmailAccount @relation(fields: [emailAccountId], references: [id], onDelete: Cascade) + + @@unique([organizationId, emailAccountId]) + @@index([emailAccountId]) +} + +model Invitation { + id String @id @default(cuid()) + organizationId String + organization Organization @relation(fields: [organizationId], references: [id], onDelete: Cascade) + email String + role String? + status String + expiresAt DateTime + inviterId String + inviter EmailAccount @relation(fields: [inviterId], references: [id], onDelete: Cascade) + + @@index([organizationId]) + @@index([inviterId]) + @@map("invitation") +} + +model Verification { + id String @id @default(cuid()) + identifier String + value String + expiresAt DateTime + createdAt DateTime @default(now()) + updatedAt DateTime @default(now()) @updatedAt + + @@map("verification") +} + +model SsoProvider { + id String @id @default(cuid()) + issuer String + oidcConfig String? + samlConfig String? + emailAccountId String? + emailAccount EmailAccount? @relation(fields: [emailAccountId], references: [id], onDelete: Cascade) + providerId String @unique + organizationId String? + organization Organization? @relation(fields: [organizationId], references: [id], onDelete: Cascade) + domain String + + @@index([emailAccountId]) + @@index([organizationId]) + @@map("ssoProvider") +} + +model Digest { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + emailAccountId String + emailAccount EmailAccount @relation(fields: [emailAccountId], references: [id], onDelete: Cascade) + items DigestItem[] + sentAt DateTime? + status DigestStatus @default(PENDING) + + @@index([emailAccountId]) +} + +model DigestItem { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + messageId String + threadId String + content String @db.Text + digestId String + digest Digest @relation(fields: [digestId], references: [id], onDelete: Cascade) + actionId String? + action ExecutedAction? @relation(fields: [actionId], references: [id], onDelete: Cascade) + coldEmailId String? + coldEmail ColdEmail? @relation(fields: [coldEmailId], references: [id]) + + @@unique([digestId, threadId, messageId]) + @@index([actionId]) + @@index([coldEmailId]) +} + +model Schedule { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + intervalDays Int? // Total interval in days + occurrences Int? // Number of times within the interval + + // Bit mask for days of week (0b0000000 to 0b1111111) + // Each bit represents a day (Sunday to Saturday) + // e.g., 0b1000001 means Sunday and Saturday + daysOfWeek Int? // 0-127 (2^7 - 1) + + // Time of day stored as DateTime with canonical date (1970-01-01) + // Only the time portion is used, but DateTime preserves timezone info + // Example: "1970-01-01T09:30:00Z", "1970-01-01T14:15:00Z" + timeOfDay DateTime? + + emailAccountId String + emailAccount EmailAccount @relation(fields: [emailAccountId], references: [id], onDelete: Cascade) + + lastOccurrenceAt DateTime? + nextOccurrenceAt DateTime? + + @@unique([emailAccountId]) +} + +model Premium { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + users User[] @relation(name: "userPremium") + admins User[] + + pendingInvites String[] + + // lemon squeezy + lemonSqueezyRenewsAt DateTime? + lemonSqueezyCustomerId Int? + lemonSqueezySubscriptionId Int? + lemonSqueezySubscriptionItemId Int? + lemonSqueezyOrderId Int? // lifetime purchase is an order and not a subscription + lemonSqueezyProductId Int? + lemonSqueezyVariantId Int? + lemonLicenseKey String? + lemonLicenseInstanceId String? + lemonSubscriptionStatus String? + + // stripe + stripeCustomerId String? @unique + stripeSubscriptionId String? @unique + stripeSubscriptionItemId String? @unique + stripePriceId String? + stripeProductId String? + stripeSubscriptionStatus String? // The current status from Stripe (e.g., 'active', 'trialing', 'past_due', 'canceled', 'unpaid'). + stripeCancelAtPeriodEnd Boolean? // If true, the subscription is set to cancel automatically at the end of the current billing period, rather than renew. + stripeRenewsAt DateTime? // Timestamp for when the current billing period ends and the subscription attempts renewal (if not canceling). Derived from `current_period_end`. + stripeTrialEnd DateTime? // Timestamp for when the free trial period ends (if applicable). Important for managing trial-to-paid transitions. + stripeCanceledAt DateTime? // Timestamp for when the subscription was definitively marked as canceled in Stripe (might be immediate or after period end). Historical data. + stripeEndedAt DateTime? // Timestamp for when the subscription ended permanently for any reason (cancellation, final payment failure). Historical data. + + tier PremiumTier? + + emailAccountsAccess Int? + + // unsubscribe/ai credits + // if `unsubscribeMonth` not set to this month, set to current month + // reset `unsubscribeCredits` each time month is changed + unsubscribeMonth Int? // 1-12 + unsubscribeCredits Int? + aiMonth Int? // 1-12 + aiCredits Int? + + // Payment history + payments Payment[] + + @@index([pendingInvites]) +} + +// not in use as it's only used for passwordless login +model VerificationToken { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + identifier String + token String @unique + expires DateTime + + @@unique([identifier, token]) +} + +model Label { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + gmailLabelId String + name String + description String? // used in prompts + enabled Boolean @default(true) + + emailAccountId String + emailAccount EmailAccount @relation(fields: [emailAccountId], references: [id], onDelete: Cascade) + + @@unique([gmailLabelId, emailAccountId]) + @@unique([name, emailAccountId]) +} + +model Rule { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + name String + actions Action[] + enabled Boolean @default(true) + automate Boolean @default(true) // @deprecated - No longer used. All rules are now automated. Kept for historical data only. + runOnThreads Boolean @default(false) // if disabled, only runs on individual emails + + emailAccountId String + emailAccount EmailAccount @relation(fields: [emailAccountId], references: [id], onDelete: Cascade) + + executedRules ExecutedRule[] + + // conditions: ai, group, static, category + conditionalOperator LogicalOperator @default(AND) + + // ai conditions + instructions String? + + // group condition + groupId String? @unique + group Group? @relation(fields: [groupId], references: [id], onDelete: Cascade) + + // static condition + // automatically apply this rule if it matches a filter. supports regex + from String? + to String? + subject String? + body String? + + // category condition + // only apply to (or do not apply to) senders in these categories + categoryFilterType CategoryFilterType? // deprecated + categoryFilters Category[] // deprecated + + systemType SystemType? + + promptText String? // natural language for this rule for prompt file. prompt file is combination of these fields + + history RuleHistory[] + + @@unique([name, emailAccountId]) + @@unique([emailAccountId, systemType]) +} + +model Action { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + type ActionType + ruleId String + rule Rule @relation(fields: [ruleId], references: [id], onDelete: Cascade) + + label String? // labelName - labelId is the source of truth, and we use it when set + labelId String? // Stable ID: Label ID (Gmail) or Category ID (Outlook) + subject String? + content String? + to String? + cc String? + bcc String? + url String? + folderName String? + folderId String? + delayInMinutes Int? + + @@index([ruleId]) +} + +model RuleHistory { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + ruleId String + rule Rule @relation(fields: [ruleId], references: [id], onDelete: Cascade) + version Int + triggerType String // "ai_update" (AI), "manual_update" (user), "ai_creation" (AI), "manual_creation" (user), "system_creation" (system), "system_update" (system) + promptText String? // The prompt text that generated this version + + name String + instructions String? + enabled Boolean + automate Boolean + runOnThreads Boolean + conditionalOperator String + from String? + to String? + subject String? + body String? + categoryFilterType String? // deprecated + systemType String? + + actions Json + categoryFilters Json? // deprecated + + @@unique([ruleId, version]) + @@index([ruleId, createdAt]) +} + +// Rule/Action models represent the rules and actions that the AI can take. +// ExecutedRule/ExecutedAction models represent the rules/actions that have been planned or executed by the AI. +model ExecutedRule { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + threadId String + messageId String + status ExecutedRuleStatus + automated Boolean + reason String? + matchMetadata Json? // Stores structured match information (e.g., learned patterns, match types) + + // may be null if the rule was deleted + ruleId String? + rule Rule? @relation(fields: [ruleId], references: [id]) + + // storing user here in case rule was deleted + emailAccountId String + emailAccount EmailAccount @relation(fields: [emailAccountId], references: [id], onDelete: Cascade) + + actionItems ExecutedAction[] + scheduledActions ScheduledAction[] + + @@index([emailAccountId, threadId, messageId, ruleId]) + @@index([emailAccountId, messageId]) + @@index([emailAccountId, status, createdAt]) +} + +model ExecutedAction { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + type ActionType + executedRuleId String + executedRule ExecutedRule @relation(fields: [executedRuleId], references: [id], onDelete: Cascade) + + // optional extra fields to be used with the action + label String? + labelId String? // Stable ID: Label ID (Gmail) or Category ID (Outlook) + subject String? + content String? + to String? + cc String? + bcc String? + url String? + folderName String? + folderId String? + + // additional fields as a result of the action + draftId String? // Gmail draft ID created by DRAFT_EMAIL action + wasDraftSent Boolean? // Tracks if the corresponding draft was sent (true) or ignored/superseded (false) + draftSendLog DraftSendLog? // Will exist if the draft was sent + digestItems DigestItem[] // Relation to digest items created by this action + scheduledAction ScheduledAction? // Reverse relation for delayed actions + + @@index([executedRuleId]) +} + +model ScheduledAction { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + executedRuleId String + actionType ActionType + messageId String + threadId String + scheduledFor DateTime + emailAccountId String + status ScheduledActionStatus @default(PENDING) + schedulingStatus SchedulingStatus @default(PENDING) + + label String? + labelId String? // Stable ID: Label ID (Gmail) or Category ID (Outlook) + subject String? + content String? + to String? + cc String? + bcc String? + url String? + folderName String? + folderId String? + scheduledId String? + + executedAt DateTime? + executedActionId String? @unique + + executedRule ExecutedRule @relation(fields: [executedRuleId], references: [id], onDelete: Cascade) + executedAction ExecutedAction? @relation(fields: [executedActionId], references: [id], onDelete: Cascade) + emailAccount EmailAccount @relation(fields: [emailAccountId], references: [id], onDelete: Cascade) + + @@index([executedRuleId]) + @@index([status, scheduledFor]) + @@index([emailAccountId, messageId]) +} + +// Notes: +// In the past groups stood on their own. Now they are attached to a rule. +// A group without a rule does not do anything anymore. I may delete all detached groups in the future, and then make rule required +// "Prompt" is no longer in use. It was used to generate the group, but now it's based on the rule the group is attached to. +// "Name" is no longer in use although still required. +// If we really wanted we could remove Group and just have a relation between Rule and GroupItem, but leaving as is for now. +model Group { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + name String + prompt String? + + items GroupItem[] + rule Rule? + + emailAccountId String + emailAccount EmailAccount @relation(fields: [emailAccountId], references: [id], onDelete: Cascade) + + @@unique([name, emailAccountId]) +} + +model GroupItem { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + groupId String? + group Group? @relation(fields: [groupId], references: [id], onDelete: Cascade) + type GroupItemType + value String // eg "@gmail.com", "matt@gmail.com", "Receipt from" + exclude Boolean @default(false) // Whether this pattern should be excluded rather than included + + @@unique([groupId, type, value]) +} + +model Category { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + name String + description String? + + emailAccountId String + emailAccount EmailAccount @relation(fields: [emailAccountId], references: [id], onDelete: Cascade) + + emailSenders Newsletter[] + rules Rule[] + + @@unique([name, emailAccountId]) +} + +// Represents a sender (`email`) that a user can unsubscribe from, +// or that our AI can mark as a cold email. +// `Newsletter` is a bad name for this. Will rename this model in the future. +model Newsletter { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + email String + status NewsletterStatus? + + // For learned patterns for rules + patternAnalyzed Boolean @default(false) + lastAnalyzedAt DateTime? + + emailAccountId String + emailAccount EmailAccount @relation(fields: [emailAccountId], references: [id], onDelete: Cascade) + + categoryId String? + category Category? @relation(fields: [categoryId], references: [id]) + + @@unique([email, emailAccountId]) + @@index([emailAccountId, status]) + @@index([categoryId]) +} + +model ColdEmail { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + fromEmail String + messageId String? + threadId String? + status ColdEmailStatus? + reason String? + + emailAccountId String + emailAccount EmailAccount @relation(fields: [emailAccountId], references: [id], onDelete: Cascade) + + digestItems DigestItem[] + + @@unique([emailAccountId, fromEmail]) + @@index([emailAccountId, status]) + @@index([emailAccountId, createdAt]) +} + +model EmailMessage { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + threadId String + messageId String + date DateTime // date of the email + from String + fromName String? // sender's display name + fromDomain String + to String + unsubscribeLink String? + read Boolean + sent Boolean + draft Boolean + inbox Boolean + + emailAccountId String + emailAccount EmailAccount @relation(fields: [emailAccountId], references: [id], onDelete: Cascade) + + @@unique([emailAccountId, threadId, messageId]) + @@index([emailAccountId, threadId]) + @@index([emailAccountId, date]) + @@index([emailAccountId, from]) + @@index([emailAccountId, fromName]) +} + +model ResponseTime { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + threadId String + sentMessageId String + receivedMessageId String + responseTimeMins Int // Denormalized: (sentAt - receivedAt) in minutes + receivedAt DateTime + sentAt DateTime + + emailAccountId String + emailAccount EmailAccount @relation(fields: [emailAccountId], references: [id], onDelete: Cascade) + + @@unique([emailAccountId, sentMessageId]) + @@index([emailAccountId, sentAt]) +} + +model ThreadTracker { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + sentAt DateTime + threadId String + messageId String + resolved Boolean @default(false) + type ThreadTrackerType + + emailAccountId String + emailAccount EmailAccount @relation(fields: [emailAccountId], references: [id], onDelete: Cascade) + + @@unique([emailAccountId, threadId, messageId]) + @@index([emailAccountId, resolved]) + @@index([emailAccountId, resolved, sentAt, type]) + @@index([emailAccountId, type, resolved, sentAt]) +} + +model CleanupJob { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + action CleanAction @default(ARCHIVE) + daysOld Int @default(7) + instructions String? + skipReply Boolean? + skipStarred Boolean? + skipCalendar Boolean? + skipReceipt Boolean? + skipAttachment Boolean? + skipConversation Boolean? + + emailAccountId String + emailAccount EmailAccount @relation(fields: [emailAccountId], references: [id], onDelete: Cascade) + + threads CleanupThread[] + + @@index([emailAccountId]) +} + +model CleanupThread { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + threadId String + archived Boolean // this can also mean "mark as read". depends on CleanupJob.action + jobId String + job CleanupJob @relation(fields: [jobId], references: [id], onDelete: Cascade) + + emailAccountId String + emailAccount EmailAccount @relation(fields: [emailAccountId], references: [id], onDelete: Cascade) + + @@index([jobId]) +} + +model Knowledge { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + title String + content String + + emailAccountId String + emailAccount EmailAccount @relation(fields: [emailAccountId], references: [id], onDelete: Cascade) + + @@unique([emailAccountId, title]) +} + +model ApiKey { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + name String? + hashedKey String @unique + isActive Boolean @default(true) + + userId String + user User @relation(fields: [userId], references: [id], onDelete: Cascade) + + @@index([userId, isActive]) +} + +model EmailToken { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + token String @unique + expiresAt DateTime + // action EmailTokenAction @default(UNSUBSCRIBE) + + emailAccountId String + emailAccount EmailAccount @relation(fields: [emailAccountId], references: [id], onDelete: Cascade) + + @@index([emailAccountId]) +} + +model Payment { + id String @id @default(cuid()) + createdAt DateTime // from processor + updatedAt DateTime // from processor + + // Relation to Premium + premiumId String? + premium Premium? @relation(fields: [premiumId], references: [id], onDelete: SetNull) + + // Payment processor information + processorType ProcessorType @default(LEMON_SQUEEZY) + processorId String? @unique // External payment ID from Stripe/Lemon Squeezy + processorSubscriptionId String? // External subscription ID + processorCustomerId String? // External customer ID + + // Core payment information + amount Int // Total amount in cents + currency String // 3-letter currency code: USD, EUR, etc. + status String // paid, failed, refunded, etc. + tax Int + taxInclusive Boolean + + // Refund information + refunded Boolean @default(false) + refundedAt DateTime? + refundedAmount Int? // in cents + + // Metadata + billingReason String? // initial, renewal, update, etc. + + @@index([premiumId]) +} + +model DraftSendLog { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + + executedActionId String @unique + executedAction ExecutedAction @relation(fields: [executedActionId], references: [id], onDelete: Cascade) + + sentMessageId String + similarityScore Float // Similarity score (0.0 to 1.0) between original draft and sent message + + @@index([executedActionId]) +} + +model Chat { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + messages ChatMessage[] + emailAccountId String + emailAccount EmailAccount @relation(fields: [emailAccountId], references: [id], onDelete: Cascade) + + @@index([emailAccountId]) +} + +model ChatMessage { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + role String + parts Json + // attachments Json? + + chatId String + chat Chat @relation(fields: [chatId], references: [id], onDelete: Cascade) + + @@index([chatId]) +} + +model CalendarConnection { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + provider String // "google" or "microsoft" + email String // Google account email (e.g., "elie@gmail.com") + accessToken String? + refreshToken String? + expiresAt DateTime? + isConnected Boolean @default(true) + + emailAccountId String + emailAccount EmailAccount @relation(fields: [emailAccountId], references: [id], onDelete: Cascade) + + calendars Calendar[] + + @@unique([emailAccountId, provider, email]) // Allow multiple Google accounts per user +} + +model Calendar { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + calendarId String // External calendar ID from provider + name String + description String? + primary Boolean @default(false) + isEnabled Boolean @default(true) + timezone String? + + connectionId String + connection CalendarConnection @relation(fields: [connectionId], references: [id], onDelete: Cascade) + + @@unique([connectionId, calendarId]) +} + +model MeetingBriefing { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + + calendarEventId String // External event ID from Google/Microsoft + eventTitle String + eventStartTime DateTime + guestCount Int + status MeetingBriefingStatus + + emailAccountId String + emailAccount EmailAccount @relation(fields: [emailAccountId], references: [id], onDelete: Cascade) + + @@unique([emailAccountId, calendarEventId]) // Prevent duplicate briefings + @@index([emailAccountId]) +} + +model Referral { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + // The user who made the referral (referrer) + referrerUserId String + referrerUser User @relation(name: "ReferrerUser", fields: [referrerUserId], references: [id], onDelete: Cascade) + + // The user who was referred + referredUserId String @unique // Each user can only be referred once + referredUser User @relation(name: "ReferredUser", fields: [referredUserId], references: [id], onDelete: Cascade) + + // The referral code used (stored as string) + referralCodeUsed String + + // Status tracking + status ReferralStatus @default(PENDING) + + // Reward tracking - using Stripe balance transactions + rewardGrantedAt DateTime? + stripeBalanceTransactionId String? // Store Stripe txn ID for reference + rewardAmount Int? // Amount in cents (e.g., 2000 for $20) + + @@index([referrerUserId]) + @@index([status]) +} + +model McpIntegration { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + name String @unique // Lookup key to MCP_INTEGRATIONS config + + // OAuth registration metadata (captured during dynamic registration) + // These represent what the credentials below were registered for + registeredServerUrl String? // Server URL credentials are registered with + registeredAuthorizationUrl String? // OAuth authorization endpoint used + registeredTokenUrl String? // OAuth token endpoint used + + // OAuth client credentials (from dynamic registration - shared across all users) + oauthClientId String? + oauthClientSecret String? + + connections McpConnection[] +} + +model McpConnection { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + name String + isActive Boolean @default(true) + + // Encrypted credentials + accessToken String? + refreshToken String? + apiKey String? + expiresAt DateTime? + + integrationId String + integration McpIntegration @relation(fields: [integrationId], references: [id], onDelete: Cascade) + emailAccountId String? + emailAccount EmailAccount? @relation(fields: [emailAccountId], references: [id], onDelete: Cascade) + tools McpTool[] + + @@unique([emailAccountId, integrationId]) +} + +model McpTool { + id String @id @default(cuid()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + name String + description String? + schema Json? + isEnabled Boolean @default(true) + + connectionId String + connection McpConnection @relation(fields: [connectionId], references: [id], onDelete: Cascade) + + @@unique([connectionId, name]) +} + +enum ActionType { + ARCHIVE + LABEL + REPLY + SEND_EMAIL + FORWARD + DRAFT_EMAIL + MARK_SPAM + CALL_WEBHOOK + MARK_READ + // TRACK_THREAD // @deprecated - No longer used. We rely on rule SystemType instead to run this. + DIGEST + MOVE_FOLDER + // SUMMARIZE + // SNOOZE + // ADD_TO_DO + // INTEGRATION // for example, add to Notion +} + +enum Frequency { + NEVER + DAILY + WEEKLY + // MONTHLY + // ANNUALLY +} + +enum NewsletterStatus { + APPROVED + UNSUBSCRIBED + AUTO_ARCHIVED +} + +enum ColdEmailStatus { + AI_LABELED_COLD + USER_REJECTED_COLD +} + +// @deprecated - No longer used +enum ColdEmailSetting { + DISABLED + LIST + LABEL + ARCHIVE_AND_LABEL + ARCHIVE_AND_READ_AND_LABEL +} + +enum PremiumTier { + BASIC_MONTHLY + BASIC_ANNUALLY + PRO_MONTHLY + PRO_ANNUALLY + BUSINESS_MONTHLY + BUSINESS_ANNUALLY + BUSINESS_PLUS_MONTHLY + BUSINESS_PLUS_ANNUALLY + COPILOT_MONTHLY + LIFETIME +} + +enum ExecutedRuleStatus { + APPLIED + APPLYING + REJECTED // @deprecated - No longer created. Kept for historical data only. + PENDING // @deprecated - No longer created. Kept for historical data only. + SKIPPED + ERROR +} + +enum GroupItemType { + FROM + SUBJECT + BODY +} + +enum CategoryFilterType { + INCLUDE + EXCLUDE +} + +enum LogicalOperator { + AND + OR +} + +enum ThreadTrackerType { + AWAITING // We're waiting for their reply + NEEDS_REPLY // We need to reply to this + NEEDS_ACTION // We need to do something else +} + +enum ProcessorType { + LEMON_SQUEEZY + STRIPE +} + +enum CleanAction { + ARCHIVE + MARK_READ +} + +enum SystemType { + // conversation trackers + TO_REPLY + FYI + AWAITING_REPLY + ACTIONED + // cold email blocker + COLD_EMAIL + // other labels + NEWSLETTER + MARKETING + CALENDAR + RECEIPT + NOTIFICATION +} + +enum ReferralStatus { + PENDING // Referral created, waiting for trial completion + COMPLETED // Referral completed and reward granted +} + +enum DigestStatus { + PENDING + PROCESSING + SENT + FAILED +} + +enum ScheduledActionStatus { + PENDING + EXECUTING + COMPLETED + FAILED + CANCELLED +} + +enum SchedulingStatus { + PENDING + SCHEDULED + FAILED +} + +enum MeetingBriefingStatus { + PENDING + SENT + FAILED + SKIPPED +} diff --git a/apps/fastmail-eventsource-daemon/src/account-manager.ts b/apps/fastmail-eventsource-daemon/src/account-manager.ts new file mode 100644 index 0000000000..a666353d15 --- /dev/null +++ b/apps/fastmail-eventsource-daemon/src/account-manager.ts @@ -0,0 +1,410 @@ +import { PrismaClient } from "../generated/prisma/client.js"; +import { env } from "./env.js"; +import { FastmailEventSourceClient } from "./eventsource-client.js"; +import { callWebhook } from "./webhook-caller.js"; + +/** + * JMAP Session URL for Fastmail. + */ +const FASTMAIL_JMAP_SESSION_URL = "https://api.fastmail.com/jmap/session"; + +/** + * JMAP OAuth token URL for Fastmail. + */ +const FASTMAIL_OAUTH_TOKEN_URL = "https://www.fastmail.com/dev/oidc/token"; + +/** + * JMAP Session response containing API URLs and account information. + */ +interface JMAPSession { + username: string; + apiUrl: string; + downloadUrl: string; + uploadUrl: string; + eventSourceUrl: string; + state: string; + accounts: Record< + string, + { + name: string; + isPersonal: boolean; + isReadOnly: boolean; + accountCapabilities: Record; + } + >; + primaryAccounts: Record; + capabilities: Record; +} + +/** + * Managed account with its EventSource client. + */ +interface ManagedAccount { + emailAccountId: string; + email: string; + client: FastmailEventSourceClient; + accessToken: string; + refreshToken: string | null; + expiresAt: Date | null; +} + +/** + * OAuth token response. + */ +interface TokenResponse { + access_token: string; + refresh_token?: string; + expires_in?: number; +} + +/** + * Manages EventSource connections for all Fastmail accounts. + */ +export class AccountManager { + private readonly prisma: PrismaClient; + private readonly connections = new Map(); + private refreshInterval: NodeJS.Timeout | null = null; + private running = false; + + constructor() { + this.prisma = new PrismaClient(); + } + + /** + * Start the account manager. + */ + async start(): Promise { + if (this.running) { + log("Already running"); + return; + } + + this.running = true; + log("Starting account manager"); + + // Initial account load + await this.refreshAccounts(); + + // Start periodic refresh + this.refreshInterval = setInterval( + () => this.refreshAccounts(), + env.ACCOUNT_REFRESH_INTERVAL, + ); + + log( + `Started with refresh interval of ${env.ACCOUNT_REFRESH_INTERVAL}ms (${env.ACCOUNT_REFRESH_INTERVAL / 1000 / 60} minutes)`, + ); + } + + /** + * Stop the account manager and close all connections. + */ + async stop(): Promise { + log("Stopping account manager"); + this.running = false; + + if (this.refreshInterval) { + clearInterval(this.refreshInterval); + this.refreshInterval = null; + } + + // Close all connections + for (const [id, managed] of this.connections) { + log(`Closing connection for ${id}`); + managed.client.close(); + } + this.connections.clear(); + + await this.prisma.$disconnect(); + log("Stopped"); + } + + /** + * Refresh the account list from the database. + * Starts new connections and removes stale ones. + */ + private async refreshAccounts(): Promise { + try { + log("Refreshing accounts from database"); + + const accounts = await this.getFastmailAccounts(); + log(`Found ${accounts.length} eligible Fastmail accounts`); + + const currentIds = new Set(this.connections.keys()); + const newIds = new Set(accounts.map((a) => a.id)); + + // Remove connections for accounts that no longer exist or are ineligible + for (const id of currentIds) { + if (!newIds.has(id)) { + log(`Removing connection for deleted/ineligible account: ${id}`); + const managed = this.connections.get(id); + managed?.client.close(); + this.connections.delete(id); + } + } + + // Add connections for new accounts + for (const account of accounts) { + if (!currentIds.has(account.id)) { + await this.addConnection(account); + } + } + } catch (error) { + log(`Error refreshing accounts: ${error}`, "error"); + } + } + + /** + * Get Fastmail accounts eligible for EventSource connections. + * These are accounts with: + * - Fastmail provider + * - Valid access token + * - Premium tier with AI access + * - At least one enabled rule + */ + private async getFastmailAccounts() { + // Query accounts similar to poll-sync.ts but simplified + // We don't need all the user/rule data, just enough to connect + return this.prisma.emailAccount.findMany({ + where: { + account: { + provider: "fastmail", + access_token: { not: null }, + }, + // Has at least one enabled rule + rules: { + some: { enabled: true }, + }, + // Has premium with AI access (simplified check) + user: { + OR: [ + // Has an active Stripe subscription + { premium: { stripeSubscriptionStatus: "active" } }, + // Has an active Lemon Squeezy subscription + { + premium: { + lemonSqueezyRenewsAt: { gt: new Date() }, + }, + }, + // Self-hosted bypass (if we're querying, assume bypass is set in main app) + // Users without premium but with AI API key + { aiApiKey: { not: null } }, + ], + }, + }, + select: { + id: true, + email: true, + account: { + select: { + access_token: true, + refresh_token: true, + expires_at: true, + }, + }, + }, + }); + } + + /** + * Add a new EventSource connection for an account. + */ + private async addConnection(account: { + id: string; + email: string; + account: { + access_token: string | null; + refresh_token: string | null; + expires_at: Date | null; + } | null; + }): Promise { + if (!account.account?.access_token) { + log(`No access token for ${account.email}`, "warn"); + return; + } + + try { + log(`Adding connection for ${account.email}`); + + // Get the JMAP session to find the eventSourceUrl + const session = await this.getJMAPSession(account.account.access_token); + if (!session) { + log(`Failed to get JMAP session for ${account.email}`, "error"); + return; + } + + const accountId = session.primaryAccounts["urn:ietf:params:jmap:mail"]; + if (!accountId) { + log(`No mail account in session for ${account.email}`, "error"); + return; + } + + const client = new FastmailEventSourceClient({ + eventSourceUrl: session.eventSourceUrl, + accessToken: account.account.access_token, + accountId, + emailAccountId: account.id, + onStateChange: (emailAccountId, newState) => { + this.handleStateChange(emailAccountId, newState); + }, + onError: (emailAccountId, error) => { + log(`EventSource error for ${emailAccountId}: ${error}`, "error"); + // Check if it's an auth error and try to refresh + if (error.message.includes("401")) { + this.handleTokenRefresh(emailAccountId); + } + }, + onConnected: (emailAccountId) => { + log(`Connected: ${emailAccountId}`); + }, + onDisconnected: (emailAccountId) => { + log(`Disconnected: ${emailAccountId}`); + }, + }); + + this.connections.set(account.id, { + emailAccountId: account.id, + email: account.email, + client, + accessToken: account.account.access_token, + refreshToken: account.account.refresh_token, + expiresAt: account.account.expires_at, + }); + + client.connect(); + } catch (error) { + log(`Failed to add connection for ${account.email}: ${error}`, "error"); + } + } + + /** + * Get the JMAP session for an account. + */ + private async getJMAPSession( + accessToken: string, + ): Promise { + try { + const response = await fetch(FASTMAIL_JMAP_SESSION_URL, { + headers: { + Authorization: `Bearer ${accessToken}`, + }, + }); + + if (!response.ok) { + log(`Failed to get JMAP session: ${response.status}`, "error"); + return null; + } + + return (await response.json()) as JMAPSession; + } catch (error) { + log(`Error fetching JMAP session: ${error}`, "error"); + return null; + } + } + + /** + * Handle a state change event from an EventSource client. + */ + private async handleStateChange( + emailAccountId: string, + newState: string, + ): Promise { + log(`State change for ${emailAccountId}: ${newState}`); + + const success = await callWebhook(emailAccountId, newState); + if (!success) { + log(`Failed to notify webhook for ${emailAccountId}`, "error"); + } + } + + /** + * Handle token refresh for an account. + */ + private async handleTokenRefresh(emailAccountId: string): Promise { + const managed = this.connections.get(emailAccountId); + if (!managed) { + log(`No managed account for ${emailAccountId}`, "warn"); + return; + } + + if (!managed.refreshToken) { + log(`No refresh token for ${emailAccountId}`, "warn"); + return; + } + + if (!env.FASTMAIL_CLIENT_ID || !env.FASTMAIL_CLIENT_SECRET) { + log("Missing Fastmail OAuth credentials for token refresh", "error"); + return; + } + + try { + log(`Refreshing token for ${emailAccountId}`); + + const response = await fetch(FASTMAIL_OAUTH_TOKEN_URL, { + method: "POST", + headers: { + "Content-Type": "application/x-www-form-urlencoded", + }, + body: new URLSearchParams({ + grant_type: "refresh_token", + refresh_token: managed.refreshToken, + client_id: env.FASTMAIL_CLIENT_ID, + client_secret: env.FASTMAIL_CLIENT_SECRET, + }), + }); + + if (!response.ok) { + log( + `Token refresh failed for ${emailAccountId}: ${response.status}`, + "error", + ); + return; + } + + const tokens = (await response.json()) as TokenResponse; + const newAccessToken = tokens.access_token; + + // Update the client with the new token + managed.accessToken = newAccessToken; + managed.refreshToken = tokens.refresh_token || managed.refreshToken; + managed.expiresAt = tokens.expires_in + ? new Date(Date.now() + tokens.expires_in * 1000) + : null; + + managed.client.updateAccessToken(newAccessToken); + + log(`Token refreshed for ${emailAccountId}`); + } catch (error) { + log(`Error refreshing token for ${emailAccountId}: ${error}`, "error"); + } + } + + /** + * Get connection stats. + */ + getStats(): { total: number; connected: number } { + let connected = 0; + for (const managed of this.connections.values()) { + if (managed.client.isConnected()) { + connected++; + } + } + return { total: this.connections.size, connected }; + } +} + +function log( + _message: string, + level: "info" | "warn" | "error" = "info", +): void { + const timestamp = new Date().toISOString(); + const _prefix = `[${timestamp}] [AccountManager]`; + + switch (level) { + case "error": + break; + case "warn": + break; + default: + } +} diff --git a/apps/fastmail-eventsource-daemon/src/env.ts b/apps/fastmail-eventsource-daemon/src/env.ts new file mode 100644 index 0000000000..24008441b4 --- /dev/null +++ b/apps/fastmail-eventsource-daemon/src/env.ts @@ -0,0 +1,16 @@ +import { createEnv } from "@t3-oss/env-core"; +import { z } from "zod"; + +export const env = createEnv({ + server: { + DATABASE_URL: z.string().url(), + MAIN_APP_URL: z.string().url(), + FASTMAIL_WEBHOOK_SECRET: z.string().min(1), + FASTMAIL_CLIENT_ID: z.string().optional(), + FASTMAIL_CLIENT_SECRET: z.string().optional(), + ACCOUNT_REFRESH_INTERVAL: z.coerce.number().default(300_000), // 5 minutes + DEBUG: z.coerce.boolean().default(false), + }, + runtimeEnv: process.env, + emptyStringAsUndefined: true, +}); diff --git a/apps/fastmail-eventsource-daemon/src/eventsource-client.ts b/apps/fastmail-eventsource-daemon/src/eventsource-client.ts new file mode 100644 index 0000000000..34a1346cf5 --- /dev/null +++ b/apps/fastmail-eventsource-daemon/src/eventsource-client.ts @@ -0,0 +1,234 @@ +import { EventSource } from "eventsource"; +import { env } from "./env.js"; + +/** + * JMAP StateChange event structure per RFC 8620 section 7.1 + */ +export interface JMAPStateChange { + "@type": "StateChange"; + changed: Record>; // accountId -> { typeName: newState } +} + +export interface EventSourceClientOptions { + eventSourceUrl: string; + accessToken: string; + accountId: string; + emailAccountId: string; + onStateChange: (emailAccountId: string, newState: string) => void; + onError?: (emailAccountId: string, error: Error) => void; + onConnected?: (emailAccountId: string) => void; + onDisconnected?: (emailAccountId: string) => void; +} + +/** + * JMAP EventSource client for Fastmail push notifications. + * + * Implements RFC 8620 section 7.3 EventSource push. + * Connects to Fastmail's EventSource URL and receives real-time + * notifications when email state changes. + */ +export class FastmailEventSourceClient { + private eventSource: EventSource | null = null; + private reconnectAttempts = 0; + private reconnectTimeout: NodeJS.Timeout | null = null; + private closed = false; + + private readonly maxReconnectAttempts = 10; + private readonly baseReconnectDelay = 1000; // 1 second + private readonly maxReconnectDelay = 300_000; // 5 minutes + + private readonly options: EventSourceClientOptions; + + constructor(options: EventSourceClientOptions) { + this.options = options; + } + + /** + * Build the EventSource URL with required parameters. + * Per RFC 8620 section 7.3: + * - types: Comma-separated list of type names to receive updates for + * - ping: Interval in seconds for server to send ping events + */ + private buildUrl(): string { + const url = new URL(this.options.eventSourceUrl); + url.searchParams.set("types", "Email"); // Only subscribe to Email changes + url.searchParams.set("ping", "60"); // Keepalive every 60 seconds + return url.toString(); + } + + /** + * Connect to the EventSource endpoint. + */ + connect(): void { + if (this.closed) { + this.log("Client is closed, not connecting"); + return; + } + + if (this.eventSource) { + this.log("Already connected, disconnecting first"); + this.disconnect(); + } + + const url = this.buildUrl(); + this.log(`Connecting to EventSource: ${url}`); + + // Create custom fetch with auth header + const accessToken = this.options.accessToken; + const customFetch: typeof fetch = (input, init) => { + const headers = new Headers(init?.headers); + headers.set("Authorization", `Bearer ${accessToken}`); + return fetch(input, { ...init, headers }); + }; + + this.eventSource = new EventSource(url, { + fetch: customFetch, + }); + + this.eventSource.onopen = () => { + this.log("EventSource connected"); + this.reconnectAttempts = 0; + this.options.onConnected?.(this.options.emailAccountId); + }; + + this.eventSource.onerror = (_event: Event) => { + this.log("EventSource error"); + + // The eventsource package auto-reconnects, but we may want custom handling + if (this.eventSource?.readyState === EventSource.CLOSED) { + this.handleDisconnect(); + } + }; + + // Listen for 'state' events (JMAP state changes) + this.eventSource.addEventListener("state", (event) => { + this.handleStateEvent(event as MessageEvent); + }); + + // Listen for 'ping' events (keepalive) + this.eventSource.addEventListener("ping", () => { + this.log("Received ping"); + }); + } + + /** + * Handle a state change event from the EventSource. + */ + private handleStateEvent(event: MessageEvent): void { + try { + const data = JSON.parse(event.data) as JMAPStateChange; + + if (data["@type"] !== "StateChange") { + this.log(`Unexpected event type: ${data["@type"]}`); + return; + } + + // Check if our account has Email state changes + const accountChanges = data.changed[this.options.accountId]; + if (!accountChanges) { + this.log("No changes for our account"); + return; + } + + const emailState = accountChanges.Email; + if (!emailState) { + this.log("No Email state change"); + return; + } + + this.log(`Email state changed to: ${emailState}`); + this.options.onStateChange(this.options.emailAccountId, emailState); + } catch (error) { + this.log(`Error parsing state event: ${error}`); + this.options.onError?.( + this.options.emailAccountId, + error instanceof Error ? error : new Error(String(error)), + ); + } + } + + /** + * Handle disconnection and schedule reconnect. + */ + private handleDisconnect(): void { + this.options.onDisconnected?.(this.options.emailAccountId); + + if (this.closed) { + this.log("Client is closed, not reconnecting"); + return; + } + + if (this.reconnectAttempts >= this.maxReconnectAttempts) { + this.log(`Max reconnect attempts (${this.maxReconnectAttempts}) reached`); + this.options.onError?.( + this.options.emailAccountId, + new Error("Max reconnect attempts reached"), + ); + return; + } + + // Exponential backoff: 1s, 2s, 4s, 8s... up to maxReconnectDelay + const delay = Math.min( + this.baseReconnectDelay * 2 ** this.reconnectAttempts, + this.maxReconnectDelay, + ); + + this.reconnectAttempts++; + this.log( + `Scheduling reconnect in ${delay}ms (attempt ${this.reconnectAttempts})`, + ); + + this.reconnectTimeout = setTimeout(() => { + this.connect(); + }, delay); + } + + /** + * Disconnect from the EventSource. + */ + disconnect(): void { + if (this.reconnectTimeout) { + clearTimeout(this.reconnectTimeout); + this.reconnectTimeout = null; + } + + if (this.eventSource) { + this.eventSource.close(); + this.eventSource = null; + } + } + + /** + * Close the client permanently (no reconnection). + */ + close(): void { + this.closed = true; + this.disconnect(); + this.log("Client closed"); + } + + /** + * Check if the client is connected. + */ + isConnected(): boolean { + return this.eventSource?.readyState === EventSource.OPEN; + } + + /** + * Update the access token (for token refresh). + */ + updateAccessToken(newToken: string): void { + const wasConnected = this.isConnected(); + this.disconnect(); + (this.options as { accessToken: string }).accessToken = newToken; + if (wasConnected) { + this.reconnectAttempts = 0; // Reset on token refresh + this.connect(); + } + } + + private log(_message: string): void { + if (env.DEBUG) { + } + } +} diff --git a/apps/fastmail-eventsource-daemon/src/index.ts b/apps/fastmail-eventsource-daemon/src/index.ts new file mode 100644 index 0000000000..19e0742310 --- /dev/null +++ b/apps/fastmail-eventsource-daemon/src/index.ts @@ -0,0 +1,79 @@ +#!/usr/bin/env node +import "dotenv/config"; +import { AccountManager } from "./account-manager.js"; + +/** + * Fastmail EventSource Daemon + * + * This daemon connects to Fastmail's JMAP EventSource API for real-time + * email notifications and forwards state changes to the main Inbox Zero + * application webhook. + * + * Architecture: + * - Connects to Fastmail EventSource for each eligible account + * - Receives real-time state change notifications + * - Calls /api/fastmail/webhook on the main app to trigger email processing + * + * This is designed for self-hosted deployments where long-running connections + * are supported. For Vercel/serverless, use the polling cron job instead. + * + * Future: When Fastmail adds PushSubscription (RFC 8620 section 7.2) support, + * this daemon will become unnecessary as Fastmail will call our webhook directly. + */ + +const _banner = ` +╔═══════════════════════════════════════════════════════════════╗ +║ Fastmail EventSource Daemon ║ +║ ║ +║ Connects to Fastmail for real-time email notifications ║ +║ and forwards state changes to the main app webhook. ║ +╚═══════════════════════════════════════════════════════════════╝ +`; + +async function main(): Promise { + const manager = new AccountManager(); + + // Handle graceful shutdown + let shuttingDown = false; + const shutdown = async (_signal: string): Promise => { + if (shuttingDown) { + return; + } + shuttingDown = true; + + try { + await manager.stop(); + process.exit(0); + } catch (_error) { + process.exit(1); + } + }; + + process.on("SIGTERM", () => shutdown("SIGTERM")); + process.on("SIGINT", () => shutdown("SIGINT")); + + // Handle uncaught errors + process.on("uncaughtException", (_error) => { + shutdown("uncaughtException"); + }); + + process.on("unhandledRejection", (_reason) => { + // Don't shutdown on unhandled rejection, just log it + }); + + // Start the account manager + try { + await manager.start(); + + // Log stats periodically + setInterval(() => { + const _stats = manager.getStats(); + }, 60_000); // Every minute + } catch (_error) { + process.exit(1); + } +} + +main().catch((_error) => { + process.exit(1); +}); diff --git a/apps/fastmail-eventsource-daemon/src/webhook-caller.ts b/apps/fastmail-eventsource-daemon/src/webhook-caller.ts new file mode 100644 index 0000000000..1994f7a8dc --- /dev/null +++ b/apps/fastmail-eventsource-daemon/src/webhook-caller.ts @@ -0,0 +1,106 @@ +import { env } from "./env.js"; + +/** + * Webhook payload sent to the main application. + */ +export interface WebhookPayload { + emailAccountId: string; + newState?: string; +} + +/** + * Maximum number of retry attempts for webhook calls. + */ +const MAX_RETRIES = 3; + +/** + * Base delay in ms for exponential backoff. + */ +const BASE_RETRY_DELAY = 1000; + +/** + * Calls the main application's Fastmail webhook endpoint. + * + * This triggers the main app to poll the Fastmail account for new emails. + * Uses exponential backoff retry on transient failures. + */ +export async function callWebhook( + emailAccountId: string, + newState?: string, +): Promise { + const webhookUrl = `${env.MAIN_APP_URL}/api/fastmail/webhook`; + const payload: WebhookPayload = { + emailAccountId, + newState, + }; + + for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { + try { + const response = await fetch(webhookUrl, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${env.FASTMAIL_WEBHOOK_SECRET}`, + }, + body: JSON.stringify(payload), + }); + + if (response.ok) { + log(`Webhook called successfully for ${emailAccountId}`); + return true; + } + + // Non-retryable error + if (response.status >= 400 && response.status < 500) { + const errorText = await response.text(); + log( + `Webhook call failed with ${response.status}: ${errorText}`, + "error", + ); + return false; + } + + // Retryable error (5xx) + log( + `Webhook call failed with ${response.status}, attempt ${attempt + 1}/${MAX_RETRIES + 1}`, + "warn", + ); + } catch (error) { + log( + `Webhook call error: ${error}, attempt ${attempt + 1}/${MAX_RETRIES + 1}`, + "error", + ); + } + + // Wait before retry (exponential backoff) + if (attempt < MAX_RETRIES) { + const delay = BASE_RETRY_DELAY * 2 ** attempt; + await sleep(delay); + } + } + + log(`Webhook call failed after ${MAX_RETRIES + 1} attempts`, "error"); + return false; +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +function log( + _message: string, + level: "info" | "warn" | "error" = "info", +): void { + const timestamp = new Date().toISOString(); + const _prefix = `[${timestamp}] [WebhookCaller]`; + + switch (level) { + case "error": + break; + case "warn": + break; + default: + if (env.DEBUG) { + } + } +} diff --git a/apps/fastmail-eventsource-daemon/tsconfig.json b/apps/fastmail-eventsource-daemon/tsconfig.json new file mode 100644 index 0000000000..7a4dba31e5 --- /dev/null +++ b/apps/fastmail-eventsource-daemon/tsconfig.json @@ -0,0 +1,17 @@ +{ + "compilerOptions": { + "target": "es2022", + "module": "es2022", + "moduleResolution": "bundler", + "lib": ["es2022"], + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true, + "outDir": "./dist", + "rootDir": "./src", + "resolveJsonModule": true + }, + "include": ["src/**/*"], + "exclude": ["node_modules"] +} diff --git a/apps/web/.env.example b/apps/web/.env.example index 7bb7d4c332..432c729070 100644 --- a/apps/web/.env.example +++ b/apps/web/.env.example @@ -28,6 +28,12 @@ FASTMAIL_CLIENT_ID= FASTMAIL_CLIENT_SECRET= # Fastmail feature flag (set to true to enable Fastmail account linking) NEXT_PUBLIC_FASTMAIL_ENABLED= +# Webhook secret for EventSource daemon (self-hosted only) +# Generate with: openssl rand -hex 32 +FASTMAIL_WEBHOOK_SECRET= +# For real-time Fastmail notifications (self-hosted only): +# Run the fastmail-eventsource-daemon alongside this app. +# See apps/fastmail-eventsource-daemon/README.md for setup instructions. # Authelia OIDC (optional - for self-hosted SSO) AUTHELIA_CLIENT_ID= diff --git a/apps/web/app/(app)/[emailAccountId]/assistant/settings/FastmailSyncSetting.tsx b/apps/web/app/(app)/[emailAccountId]/assistant/settings/FastmailSyncSetting.tsx new file mode 100644 index 0000000000..7a257aa2ba --- /dev/null +++ b/apps/web/app/(app)/[emailAccountId]/assistant/settings/FastmailSyncSetting.tsx @@ -0,0 +1,23 @@ +"use client"; + +import { SettingCard } from "@/components/SettingCard"; +import { FastmailSyncButton } from "@/components/FastmailSyncButton"; +import { useAccount } from "@/providers/EmailAccountProvider"; +import { isFastmailProvider } from "@/utils/email/provider-types"; + +export function FastmailSyncSetting() { + const { provider } = useAccount(); + + // Only show for Fastmail accounts + if (!isFastmailProvider(provider)) { + return null; + } + + return ( + } + /> + ); +} diff --git a/apps/web/app/(app)/[emailAccountId]/assistant/settings/SettingsTab.tsx b/apps/web/app/(app)/[emailAccountId]/assistant/settings/SettingsTab.tsx index dd5e0e1789..32cb972e39 100644 --- a/apps/web/app/(app)/[emailAccountId]/assistant/settings/SettingsTab.tsx +++ b/apps/web/app/(app)/[emailAccountId]/assistant/settings/SettingsTab.tsx @@ -7,11 +7,13 @@ import { LearnedPatternsSetting } from "@/app/(app)/[emailAccountId]/assistant/s import { PersonalSignatureSetting } from "@/app/(app)/[emailAccountId]/assistant/settings/PersonalSignatureSetting"; import { MultiRuleSetting } from "@/app/(app)/[emailAccountId]/assistant/settings/MultiRuleSetting"; import { WritingStyleSetting } from "@/app/(app)/[emailAccountId]/assistant/settings/WritingStyleSetting"; +import { FastmailSyncSetting } from "@/app/(app)/[emailAccountId]/assistant/settings/FastmailSyncSetting"; import { env } from "@/env"; export function SettingsTab() { return (
+ diff --git a/apps/web/app/(app)/[emailAccountId]/mail/BetaBanner.tsx b/apps/web/app/(app)/[emailAccountId]/mail/BetaBanner.tsx index 4957dae5e3..32ab13f757 100644 --- a/apps/web/app/(app)/[emailAccountId]/mail/BetaBanner.tsx +++ b/apps/web/app/(app)/[emailAccountId]/mail/BetaBanner.tsx @@ -4,7 +4,7 @@ import { useLocalStorage } from "usehooks-ts"; import { Banner } from "@/components/Banner"; export function BetaBanner() { - const [bannerVisible, setBannerVisible] = useLocalStorage< + const [bannerVisible, _setBannerVisible] = useLocalStorage< boolean | undefined >("mailBetaBannerVisibile", true); diff --git a/apps/web/app/api/fastmail/poll/route.ts b/apps/web/app/api/fastmail/poll/route.ts new file mode 100644 index 0000000000..b07bb9b756 --- /dev/null +++ b/apps/web/app/api/fastmail/poll/route.ts @@ -0,0 +1,48 @@ +import { NextResponse } from "next/server"; +import { hasCronSecret, hasPostCronSecret } from "@/utils/cron"; +import { withError } from "@/utils/middleware"; +import { captureException } from "@/utils/error"; +import { pollAllFastmailAccounts } from "@/utils/fastmail/poll-sync"; +import type { Logger } from "@/utils/logger"; + +export const dynamic = "force-dynamic"; +export const maxDuration = 300; // 5 minutes max + +export const GET = withError("fastmail/poll", async (request) => { + if (!hasCronSecret(request)) { + captureException(new Error("Unauthorized cron request: api/fastmail/poll")); + return new Response("Unauthorized", { status: 401 }); + } + + return pollFastmail(request.logger); +}); + +export const POST = withError("fastmail/poll", async (request) => { + if (!(await hasPostCronSecret(request))) { + captureException(new Error("Unauthorized cron request: api/fastmail/poll")); + return new Response("Unauthorized", { status: 401 }); + } + + return pollFastmail(request.logger); +}); + +async function pollFastmail(logger: Logger) { + try { + const results = await pollAllFastmailAccounts(logger); + + return NextResponse.json({ + success: true, + results, + summary: { + total: results.length, + successful: results.filter((r) => r.status === "success").length, + noChanges: results.filter((r) => r.status === "no_changes").length, + skipped: results.filter((r) => r.status === "skipped").length, + errors: results.filter((r) => r.status === "error").length, + }, + }); + } catch (error) { + logger.error("Failed to poll Fastmail accounts", { error }); + throw error; + } +} diff --git a/apps/web/app/api/fastmail/webhook/route.ts b/apps/web/app/api/fastmail/webhook/route.ts new file mode 100644 index 0000000000..02def63f10 --- /dev/null +++ b/apps/web/app/api/fastmail/webhook/route.ts @@ -0,0 +1,112 @@ +import { NextResponse } from "next/server"; +import { after } from "next/server"; +import { z } from "zod"; +import { env } from "@/env"; +import { withError } from "@/utils/middleware"; +import { captureException } from "@/utils/error"; +import { pollFastmailAccount } from "@/utils/fastmail/poll-sync"; +import type { Logger } from "@/utils/logger"; +import type { RequestWithLogger } from "@/utils/middleware"; + +export const dynamic = "force-dynamic"; +export const maxDuration = 300; // 5 minutes max + +/** + * Webhook payload schema for Fastmail push notifications. + * + * This endpoint is designed to work with: + * 1. EventSource daemon (current) - separate process POSTs state changes here + * 2. Future Fastmail PushSubscription (RFC 8620 section 7.2) - when Fastmail + * supports webhooks, they will POST directly to this endpoint + */ +const webhookPayloadSchema = z.object({ + emailAccountId: z.string().min(1), + newState: z.string().optional(), // State token from JMAP EventSource +}); + +export type FastmailWebhookPayload = z.infer; + +/** + * Validate the webhook secret from the Authorization header. + */ +function hasWebhookSecret(request: RequestWithLogger): boolean { + if (!env.FASTMAIL_WEBHOOK_SECRET) { + request.logger.error( + "FASTMAIL_WEBHOOK_SECRET not set, rejecting webhook request", + ); + return false; + } + + const authHeader = request.headers.get("authorization"); + const valid = authHeader === `Bearer ${env.FASTMAIL_WEBHOOK_SECRET}`; + + if (!valid) { + request.logger.error("Invalid webhook secret", { authHeader }); + } + + return valid; +} + +export const POST = withError("fastmail/webhook", async (request) => { + // Validate auth using dedicated webhook secret + if (!hasWebhookSecret(request)) { + captureException( + new Error("Unauthorized fastmail webhook request: api/fastmail/webhook"), + ); + return new Response("Unauthorized", { status: 401 }); + } + + // Parse and validate payload + const body = await request.json(); + const parseResult = webhookPayloadSchema.safeParse(body); + + if (!parseResult.success) { + request.logger.error("Invalid webhook payload", { + errors: parseResult.error.errors, + }); + return NextResponse.json( + { error: "Invalid payload", details: parseResult.error.errors }, + { status: 400 }, + ); + } + + const { emailAccountId, newState } = parseResult.data; + + request.logger.info("Received Fastmail webhook - acknowledging immediately", { + emailAccountId, + newState, + }); + + // Process asynchronously using after() to respond quickly + // This is important for the daemon to know the webhook was received + after(() => processWebhookAsync(emailAccountId, newState, request.logger)); + + return NextResponse.json({ success: true }); +}); + +async function processWebhookAsync( + emailAccountId: string, + newState: string | undefined, + logger: Logger, +) { + const log = logger.with({ + emailAccountId, + newState, + action: "processWebhookAsync", + }); + + try { + log.info("Processing Fastmail webhook"); + + const result = await pollFastmailAccount({ + emailAccountId, + logger: log, + forceSync: true, // Skip the 2-minute cooldown since we know there's a change + }); + + log.info("Fastmail webhook processed", { result }); + } catch (error) { + log.error("Failed to process Fastmail webhook", { error }); + captureException(error); + } +} diff --git a/apps/web/components/FastmailSyncButton.tsx b/apps/web/components/FastmailSyncButton.tsx new file mode 100644 index 0000000000..0b0ee77e8a --- /dev/null +++ b/apps/web/components/FastmailSyncButton.tsx @@ -0,0 +1,64 @@ +"use client"; + +import { useState, useCallback } from "react"; +import { RefreshCwIcon } from "lucide-react"; +import { Button } from "@/components/ui/button"; +import { toastSuccess, toastError } from "@/components/Toast"; +import { syncFastmailAction } from "@/utils/actions/fastmail-sync"; +import { useAccount } from "@/providers/EmailAccountProvider"; +import { isFastmailProvider } from "@/utils/email/provider-types"; + +export function FastmailSyncButton() { + const { emailAccountId, provider } = useAccount(); + const [isLoading, setIsLoading] = useState(false); + + const handleSync = useCallback(async () => { + if (!emailAccountId) return; + + setIsLoading(true); + try { + const result = await syncFastmailAction(emailAccountId); + + if (result?.serverError) { + toastError({ + title: "Sync failed", + description: result.serverError, + }); + } else if (result?.data) { + if (result.data.status === "no_changes") { + toastSuccess({ description: "No new emails" }); + } else { + toastSuccess({ + description: `Synced ${result.data.processedCount} new emails`, + }); + } + } + } catch (error) { + toastError({ + title: "Sync failed", + description: error instanceof Error ? error.message : "Unknown error", + }); + } finally { + setIsLoading(false); + } + }, [emailAccountId]); + + // Only show for Fastmail accounts + if (!isFastmailProvider(provider)) { + return null; + } + + return ( + + ); +} diff --git a/apps/web/env.ts b/apps/web/env.ts index 75b0c35b45..24486b6496 100644 --- a/apps/web/env.ts +++ b/apps/web/env.ts @@ -28,6 +28,7 @@ export const env = createEnv({ MICROSOFT_TENANT_ID: z.string().optional().default("common"), FASTMAIL_CLIENT_ID: z.string().optional(), FASTMAIL_CLIENT_SECRET: z.string().optional(), + FASTMAIL_WEBHOOK_SECRET: z.string().optional(), // Shared secret for EventSource daemon webhook auth AUTHELIA_CLIENT_ID: z.string().optional(), AUTHELIA_CLIENT_SECRET: z.string().optional(), AUTHELIA_ISSUER_URL: z.string().optional(), // e.g., https://auth.yourdomain.com diff --git a/apps/web/prisma/migrations/20251219094005_add_last_polled_at_for_fastmail_polling/migration.sql b/apps/web/prisma/migrations/20251219094005_add_last_polled_at_for_fastmail_polling/migration.sql new file mode 100644 index 0000000000..4b310a7921 --- /dev/null +++ b/apps/web/prisma/migrations/20251219094005_add_last_polled_at_for_fastmail_polling/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "EmailAccount" ADD COLUMN "lastPolledAt" TIMESTAMP(3); diff --git a/apps/web/prisma/schema.prisma b/apps/web/prisma/schema.prisma index a558478efa..a119794e68 100644 --- a/apps/web/prisma/schema.prisma +++ b/apps/web/prisma/schema.prisma @@ -125,6 +125,7 @@ model EmailAccount { watchEmailsSubscriptionId String? // For Outlook subscription ID watchEmailsSubscriptionHistory Json? // Historical Outlook subscription IDs: [{ subscriptionId, createdAt, replacedAt }] lastSyncedHistoryId String? + lastPolledAt DateTime? // For Fastmail polling - tracks last successful poll behaviorProfile Json? personaAnalysis Json? // ai analysis of the user's persona role String? // the role confirmed by the user - previously `User.surveyRole` diff --git a/apps/web/utils/actions/fastmail-sync.ts b/apps/web/utils/actions/fastmail-sync.ts new file mode 100644 index 0000000000..a3c829daf7 --- /dev/null +++ b/apps/web/utils/actions/fastmail-sync.ts @@ -0,0 +1,49 @@ +"use server"; + +import { actionClient } from "@/utils/actions/safe-action"; +import { pollFastmailAccount } from "@/utils/fastmail/poll-sync"; +import { isFastmailProvider } from "@/utils/email/provider-types"; +import prisma from "@/utils/prisma"; + +export const syncFastmailAction = actionClient + .metadata({ name: "syncFastmail" }) + .action(async ({ ctx: { emailAccountId, logger } }) => { + const account = await prisma.emailAccount.findUnique({ + where: { id: emailAccountId }, + select: { + id: true, + email: true, + account: { select: { provider: true } }, + }, + }); + + if (!account) { + throw new Error("Account not found"); + } + + if (!isFastmailProvider(account.account?.provider)) { + throw new Error( + "Sync is only available for Fastmail accounts. Gmail and Outlook use push notifications.", + ); + } + + const result = await pollFastmailAccount({ + emailAccountId, + logger: logger.with({ emailAccountId, email: account.email }), + forceSync: true, // User-triggered sync should bypass rate limiting + }); + + if (result.status === "error") { + throw new Error(result.error || "Unknown error during sync"); + } + + return { + success: true, + status: result.status, + processedCount: result.processedCount || 0, + message: + result.status === "no_changes" + ? "No new emails found" + : `Processed ${result.processedCount} new emails`, + }; + }); diff --git a/apps/web/utils/ai/report/analyze-email-behavior.ts b/apps/web/utils/ai/report/analyze-email-behavior.ts index b8aaef5f7b..82d484bfa8 100644 --- a/apps/web/utils/ai/report/analyze-email-behavior.ts +++ b/apps/web/utils/ai/report/analyze-email-behavior.ts @@ -5,7 +5,7 @@ import type { EmailSummary } from "@/utils/ai/report/summarize-emails"; import { createScopedLogger } from "@/utils/logger"; import { getModel } from "@/utils/llms/model"; -const logger = createScopedLogger("email-report-email-behavior"); +const _logger = createScopedLogger("email-report-email-behavior"); const emailBehaviorSchema = z.object({ timingPatterns: z.object({ diff --git a/apps/web/utils/ai/report/analyze-label-optimization.ts b/apps/web/utils/ai/report/analyze-label-optimization.ts index 7e2dd60f11..798738c243 100644 --- a/apps/web/utils/ai/report/analyze-label-optimization.ts +++ b/apps/web/utils/ai/report/analyze-label-optimization.ts @@ -6,7 +6,7 @@ import type { EmailSummary } from "@/utils/ai/report/summarize-emails"; import { createScopedLogger } from "@/utils/logger"; import { getModel } from "@/utils/llms/model"; -const logger = createScopedLogger("email-report-label-analysis"); +const _logger = createScopedLogger("email-report-label-analysis"); const labelAnalysisSchema = z.object({ optimizationSuggestions: z.array( diff --git a/apps/web/utils/ai/report/generate-actionable-recommendations.ts b/apps/web/utils/ai/report/generate-actionable-recommendations.ts index 53add316d9..491e83eec9 100644 --- a/apps/web/utils/ai/report/generate-actionable-recommendations.ts +++ b/apps/web/utils/ai/report/generate-actionable-recommendations.ts @@ -6,7 +6,7 @@ import type { EmailSummary } from "@/utils/ai/report/summarize-emails"; import { createScopedLogger } from "@/utils/logger"; import { getModel } from "@/utils/llms/model"; -const logger = createScopedLogger("email-report-actionable-recommendations"); +const _logger = createScopedLogger("email-report-actionable-recommendations"); const actionableRecommendationsSchema = z.object({ immediateActions: z.array( diff --git a/apps/web/utils/ai/report/generate-executive-summary.ts b/apps/web/utils/ai/report/generate-executive-summary.ts index 89bf6c1d3c..24bc29237d 100644 --- a/apps/web/utils/ai/report/generate-executive-summary.ts +++ b/apps/web/utils/ai/report/generate-executive-summary.ts @@ -6,7 +6,7 @@ import type { EmailSummary } from "@/utils/ai/report/summarize-emails"; import { createScopedLogger } from "@/utils/logger"; import { getModel } from "@/utils/llms/model"; -const logger = createScopedLogger("email-report-executive-summary"); +const _logger = createScopedLogger("email-report-executive-summary"); const executiveSummarySchema = z.object({ userProfile: z.object({ diff --git a/apps/web/utils/ai/report/response-patterns.ts b/apps/web/utils/ai/report/response-patterns.ts index 3ddc172ac4..8307687fe2 100644 --- a/apps/web/utils/ai/report/response-patterns.ts +++ b/apps/web/utils/ai/report/response-patterns.ts @@ -5,7 +5,7 @@ import type { EmailAccountWithAI } from "@/utils/llms/types"; import { createScopedLogger } from "@/utils/logger"; import { getModel } from "@/utils/llms/model"; -const logger = createScopedLogger("email-report-response-patterns"); +const _logger = createScopedLogger("email-report-response-patterns"); const responsePatternsSchema = z.object({ commonResponses: z.array( diff --git a/apps/web/utils/email/fastmail.ts b/apps/web/utils/email/fastmail.ts index 2d680c21cf..615f2222a1 100644 --- a/apps/web/utils/email/fastmail.ts +++ b/apps/web/utils/email/fastmail.ts @@ -1,15 +1,9 @@ import type { ParsedMessage } from "@/utils/types"; import type { FastmailClient, - JMAPMethodCall, JMAPMethodResponse, - JMAPError, -} from "@/utils/fastmail/client"; -import { - getAccessTokenFromClient, - getJMAPError, - isJMAPError, } from "@/utils/fastmail/client"; +import { getAccessTokenFromClient } from "@/utils/fastmail/client"; import { FastmailMailbox } from "@/utils/fastmail/constants"; import type { InboxZeroLabel } from "@/utils/label"; import type { ThreadsQuery } from "@/app/api/threads/validation"; @@ -168,6 +162,20 @@ interface JMAPSetResponse { notDestroyed?: Record; } +/** + * JMAP /changes response for tracking incremental updates + * @see https://jmap.io/spec-core.html#changes + */ +interface JMAPChangesResponse { + accountId: string; + oldState: string; + newState: string; + hasMoreChanges: boolean; + created: string[]; + updated: string[]; + destroyed: string[]; +} + // Helper to extract typed response data from JMAP method responses // JMAP responses are [methodName, data, callId] tuples where data structure varies by method // biome-ignore lint/suspicious/noExplicitAny: JMAP response types are complex and vary by method @@ -548,14 +556,14 @@ export class FastmailProvider implements EmailProvider { const log = this.logger.with({ action: "getThread", threadId }); try { - // Get all emails in the thread + // Get thread to retrieve email IDs, then fetch emails + // Note: JMAP does not support "inThread" filter - must use Thread/get const response = await this.client.request([ [ - "Email/query", + "Thread/get", { accountId: this.client.accountId, - filter: { inThread: threadId }, - sort: [{ property: "receivedAt", isAscending: true }], + ids: [threadId], }, "0", ], @@ -565,8 +573,8 @@ export class FastmailProvider implements EmailProvider { accountId: this.client.accountId, "#ids": { resultOf: "0", - name: "Email/query", - path: "/ids", + name: "Thread/get", + path: "/list/*/emailIds", }, properties: [...EMAIL_PROPERTIES], fetchAllBodyValues: true, @@ -1109,7 +1117,7 @@ export class FastmailProvider implements EmailProvider { async bulkArchiveFromSenders( fromEmails: string[], - ownerEmail: string, + _ownerEmail: string, _emailAccountId: string, ): Promise { const log = this.logger.with({ @@ -2551,7 +2559,7 @@ export class FastmailProvider implements EmailProvider { * @returns Object with base64-encoded data and size in bytes */ async getAttachment( - messageId: string, + _messageId: string, attachmentId: string, ): Promise<{ data: string; size: number }> { // JMAP uses blob download URL @@ -2823,6 +2831,120 @@ export class FastmailProvider implements EmailProvider { })); } + /** + * Get email changes since a given state token. + * Uses JMAP's Email/changes endpoint for efficient incremental sync. + * @param sinceState - The state token from the last sync (stored in lastSyncedHistoryId) + * @returns Object containing new state, arrays of created/updated/destroyed IDs, and hasMoreChanges flag + */ + async getEmailChanges(sinceState: string | null): Promise<{ + newState: string; + created: string[]; + updated: string[]; + destroyed: string[]; + hasMoreChanges: boolean; + }> { + this.logger.info("Getting email changes", { sinceState }); + + // If no previous state, get current state only (first sync) + if (!sinceState) { + this.logger.info("No previous state, fetching current state"); + const response = await this.client.request([ + [ + "Email/get", + { + accountId: this.client.accountId, + ids: [], + properties: ["id"], + }, + "0", + ], + ]); + + const result = getResponseData>( + response.methodResponses[0], + ); + + return { + newState: result.state, + created: [], + updated: [], + destroyed: [], + hasMoreChanges: false, + }; + } + + try { + const response = await this.client.request([ + [ + "Email/changes", + { + accountId: this.client.accountId, + sinceState, + maxChanges: 100, // Limit to avoid overwhelming processing + }, + "0", + ], + ]); + + const changes = getResponseData( + response.methodResponses[0], + ); + + this.logger.info("Got email changes", { + created: changes.created.length, + updated: changes.updated.length, + destroyed: changes.destroyed.length, + hasMoreChanges: changes.hasMoreChanges, + }); + + return { + newState: changes.newState, + created: changes.created, + updated: changes.updated, + destroyed: changes.destroyed, + hasMoreChanges: changes.hasMoreChanges, + }; + } catch (error) { + // Handle state mismatch errors (cannotCalculateChanges) + if ( + isJMAPErrorType(error, "cannotCalculateChanges") || + isJMAPErrorType(error, "invalidState") + ) { + this.logger.warn( + "State is too old or invalid, need to re-sync from scratch", + { error }, + ); + // Return empty changes with a flag to indicate full resync needed + // The caller should reset the state and start fresh + const response = await this.client.request([ + [ + "Email/get", + { + accountId: this.client.accountId, + ids: [], + properties: ["id"], + }, + "0", + ], + ]); + + const result = getResponseData>( + response.methodResponses[0], + ); + + return { + newState: result.state, + created: [], + updated: [], + destroyed: [], + hasMoreChanges: false, + }; + } + throw error; + } + } + async processHistory(_options: { emailAddress: string; historyId?: number; diff --git a/apps/web/utils/email/provider-types.ts b/apps/web/utils/email/provider-types.ts index 4e20d9d22d..0e6970d375 100644 --- a/apps/web/utils/email/provider-types.ts +++ b/apps/web/utils/email/provider-types.ts @@ -9,3 +9,11 @@ export function isMicrosoftProvider(provider: string | null | undefined) { export function isFastmailProvider(provider: string | null | undefined) { return provider === "fastmail"; } + +export function supportsServerFilters(provider: string | null | undefined) { + return provider === "google" || provider === "microsoft"; +} + +export function supportsPushNotifications(provider: string | null | undefined) { + return provider === "google" || provider === "microsoft"; +} diff --git a/apps/web/utils/email/watch-manager.ts b/apps/web/utils/email/watch-manager.ts index e82544a805..375dc2f2fb 100644 --- a/apps/web/utils/email/watch-manager.ts +++ b/apps/web/utils/email/watch-manager.ts @@ -6,7 +6,10 @@ import { captureException } from "@/utils/error"; import { cleanupInvalidTokens } from "@/utils/auth/cleanup-invalid-tokens"; import type { EmailProvider } from "@/utils/email/types"; import { createManagedOutlookSubscription } from "@/utils/outlook/subscription-manager"; -import { isMicrosoftProvider } from "@/utils/email/provider-types"; +import { + isMicrosoftProvider, + isFastmailProvider, +} from "@/utils/email/provider-types"; export type WatchEmailAccountResult = | { @@ -211,6 +214,19 @@ async function watchEmails({ logger.info("Watching emails"); try { + // Fastmail doesn't support webhooks for third-party apps + // It uses polling via /api/fastmail/poll cron job instead + if (isFastmailProvider(provider.name)) { + logger.info( + "Fastmail uses polling instead of webhooks - skipping watch setup", + ); + // Return a far-future expiration to prevent watch-manager from treating this as an error + // The actual polling is handled by the /api/fastmail/poll cron job + const pollingExpiration = new Date(); + pollingExpiration.setFullYear(pollingExpiration.getFullYear() + 10); + return { success: true, expirationDate: pollingExpiration }; + } + if (isMicrosoftProvider(provider.name)) { const result = await createManagedOutlookSubscription({ emailAccountId, diff --git a/apps/web/utils/fastmail/poll-sync.ts b/apps/web/utils/fastmail/poll-sync.ts new file mode 100644 index 0000000000..20bd290887 --- /dev/null +++ b/apps/web/utils/fastmail/poll-sync.ts @@ -0,0 +1,332 @@ +import prisma from "@/utils/prisma"; +import { hasAiAccess, getPremiumUserFilter } from "@/utils/premium"; +import { createEmailProvider } from "@/utils/email/provider"; +import { processHistoryItem } from "@/utils/webhook/process-history-item"; +import type { FastmailProvider } from "@/utils/email/fastmail"; +import type { Logger } from "@/utils/logger"; + +export interface PollSyncResult { + emailAccountId: string; + email: string; + status: "success" | "error" | "skipped" | "no_changes"; + processedCount?: number; + newState?: string; + error?: string; +} + +/** + * Get Fastmail accounts that are eligible for polling + */ +async function getFastmailAccountsToPoll() { + return prisma.emailAccount.findMany({ + where: { + account: { + provider: "fastmail", + }, + ...getPremiumUserFilter(), + }, + select: { + id: true, + email: true, + lastSyncedHistoryId: true, + lastPolledAt: true, + autoCategorizeSenders: true, + about: true, + multiRuleSelectionEnabled: true, + timezone: true, + calendarBookingLink: true, + account: { + select: { + provider: true, + access_token: true, + refresh_token: true, + expires_at: true, + }, + }, + rules: { + where: { enabled: true }, + include: { actions: true }, + }, + user: { + select: { + id: true, + aiProvider: true, + aiModel: true, + aiApiKey: true, + premium: { + select: { + tier: true, + lemonSqueezyRenewsAt: true, + stripeSubscriptionStatus: true, + }, + }, + }, + }, + }, + orderBy: { + lastPolledAt: { sort: "asc", nulls: "first" }, + }, + }); +} + +/** + * Poll a single Fastmail account for new emails and process them through the rule engine + */ +export async function pollFastmailAccount({ + emailAccountId, + logger, + forceSync = false, +}: { + emailAccountId: string; + logger: Logger; + forceSync?: boolean; +}): Promise { + const log = logger.with({ emailAccountId, action: "pollFastmailAccount" }); + + try { + const account = await prisma.emailAccount.findUnique({ + where: { id: emailAccountId }, + select: { + id: true, + email: true, + lastSyncedHistoryId: true, + lastPolledAt: true, + autoCategorizeSenders: true, + about: true, + multiRuleSelectionEnabled: true, + timezone: true, + calendarBookingLink: true, + account: { + select: { + provider: true, + access_token: true, + refresh_token: true, + expires_at: true, + }, + }, + rules: { + where: { enabled: true }, + include: { actions: true }, + }, + user: { + select: { + id: true, + aiProvider: true, + aiModel: true, + aiApiKey: true, + premium: { + select: { + tier: true, + lemonSqueezyRenewsAt: true, + stripeSubscriptionStatus: true, + }, + }, + }, + }, + }, + }); + + if (!account) { + return { + emailAccountId, + email: "", + status: "error", + error: "Account not found", + }; + } + + if (account.account?.provider !== "fastmail") { + return { + emailAccountId, + email: account.email, + status: "skipped", + error: "Not a Fastmail account", + }; + } + + // Skip accounts polled recently (< 2 minutes ago) during cron unless forced + if (!forceSync && account.lastPolledAt) { + const timeSinceLastPoll = Date.now() - account.lastPolledAt.getTime(); + if (timeSinceLastPoll < 2 * 60 * 1000) { + log.info("Skipping recently polled account", { + lastPolledAt: account.lastPolledAt, + timeSinceLastPoll, + }); + return { + emailAccountId, + email: account.email, + status: "skipped", + error: "Recently polled", + }; + } + } + + // Check if user has AI access + const userHasAiAccess = hasAiAccess( + account.user.premium?.tier || null, + account.user.aiApiKey, + ); + + if (!userHasAiAccess) { + log.info("User does not have AI access"); + return { + emailAccountId, + email: account.email, + status: "skipped", + error: "No AI access", + }; + } + + // Check if user has rules + const hasAutomationRules = account.rules.length > 0; + if (!hasAutomationRules) { + log.info("User has no enabled rules"); + return { + emailAccountId, + email: account.email, + status: "skipped", + error: "No rules enabled", + }; + } + + // Check for tokens (refresh_token is optional for app token accounts) + if (!account.account?.access_token) { + log.error("Missing access token"); + return { + emailAccountId, + email: account.email, + status: "error", + error: "Missing authentication tokens", + }; + } + + log.info("Creating Fastmail provider"); + + const provider = (await createEmailProvider({ + emailAccountId, + provider: "fastmail", + logger: log, + })) as FastmailProvider; + + // Get changes since last state + const sinceState = account.lastSyncedHistoryId; + log.info("Getting email changes", { sinceState }); + + const changes = await provider.getEmailChanges(sinceState); + + if (changes.created.length === 0 && changes.updated.length === 0) { + log.info("No new emails found"); + // Update lastPolledAt even when no changes + await prisma.emailAccount.update({ + where: { id: emailAccountId }, + data: { + lastPolledAt: new Date(), + lastSyncedHistoryId: changes.newState, + }, + }); + return { + emailAccountId, + email: account.email, + status: "no_changes", + newState: changes.newState, + }; + } + + log.info("Processing new emails", { + created: changes.created.length, + updated: changes.updated.length, + }); + + // Process new emails through the rule engine + let processedCount = 0; + for (const messageId of changes.created) { + try { + log.info("Processing message", { messageId }); + await processHistoryItem( + { messageId }, + { + provider, + emailAccount: { + ...account, + userId: account.user.id, + user: account.user, + }, + hasAutomationRules, + hasAiAccess: userHasAiAccess, + rules: account.rules, + logger: log.with({ messageId }), + }, + ); + processedCount++; + } catch (error) { + log.error("Error processing message", { messageId, error }); + } + } + + // Update state and polling timestamp + await prisma.emailAccount.update({ + where: { id: emailAccountId }, + data: { + lastSyncedHistoryId: changes.newState, + lastPolledAt: new Date(), + }, + }); + + // Handle pagination if there are more changes + if (changes.hasMoreChanges) { + log.info("More changes available, will be processed on next poll"); + } + + log.info("Completed polling", { + processedCount, + newState: changes.newState, + }); + + return { + emailAccountId, + email: account.email, + status: "success", + processedCount, + newState: changes.newState, + }; + } catch (error) { + log.error("Error polling Fastmail account", { error }); + return { + emailAccountId, + email: "", + status: "error", + error: error instanceof Error ? error.message : String(error), + }; + } +} + +/** + * Poll all Fastmail accounts for new emails + */ +export async function pollAllFastmailAccounts( + logger: Logger, +): Promise { + const fastmailAccounts = await getFastmailAccountsToPoll(); + + logger.info("Polling Fastmail accounts", { count: fastmailAccounts.length }); + + const results: PollSyncResult[] = []; + + for (const account of fastmailAccounts) { + const result = await pollFastmailAccount({ + emailAccountId: account.id, + logger: logger.with({ email: account.email }), + }); + results.push(result); + } + + logger.info("Completed polling all Fastmail accounts", { + total: results.length, + successful: results.filter((r) => r.status === "success").length, + noChanges: results.filter((r) => r.status === "no_changes").length, + skipped: results.filter((r) => r.status === "skipped").length, + errors: results.filter((r) => r.status === "error").length, + }); + + return results; +} diff --git a/apps/web/utils/outlook/subscription-manager.ts b/apps/web/utils/outlook/subscription-manager.ts index d046efdd1e..9edc7e9197 100644 --- a/apps/web/utils/outlook/subscription-manager.ts +++ b/apps/web/utils/outlook/subscription-manager.ts @@ -1,5 +1,4 @@ import prisma from "@/utils/prisma"; -import { createScopedLogger } from "@/utils/logger"; import { captureException } from "@/utils/error"; import type { EmailProvider } from "@/utils/email/types"; import { createEmailProvider } from "@/utils/email/provider"; diff --git a/apps/web/utils/schedule.test.ts b/apps/web/utils/schedule.test.ts index d55ed86b0c..42818ffc52 100644 --- a/apps/web/utils/schedule.test.ts +++ b/apps/web/utils/schedule.test.ts @@ -765,7 +765,7 @@ describe("calculateNextScheduleDate - Bug Fix Tests", () => { }; // Simulate the bug: if we used current time (12:36 PM) instead of lastOccurrenceAt - const currentTime = createTestDate("2024-01-15T12:36:00Z"); // 12:36 PM + const _currentTime = createTestDate("2024-01-15T12:36:00Z"); // 12:36 PM // With the fix: should calculate from lastOccurrenceAt (midnight) const result = calculateNextScheduleDate(schedule); @@ -819,7 +819,7 @@ describe("calculateNextScheduleDate - Bug Fix Tests", () => { it("should prevent schedule drift in digest processing scenario", () => { // Simulate the exact scenario from the bug report const originalScheduledTime = createTestDate("2024-01-15T00:00:00Z"); // Midnight - const processingTime = createTestDate("2024-01-15T12:36:40Z"); // 12:36 PM (when cron processed it) + const _processingTime = createTestDate("2024-01-15T12:36:40Z"); // 12:36 PM (when cron processed it) const schedule = { intervalDays: 1, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f0077150d9..f2255a6bfa 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -46,6 +46,37 @@ importers: specifier: 5.3.3 version: 5.3.3(@inquirer/prompts@7.10.1(@types/node@24.10.1))(@types/debug@4.1.12)(@types/node@24.10.1)(@vitest/ui@3.2.4)(jiti@2.6.1)(jsdom@27.2.0)(terser@5.44.1)(tsx@4.21.0)(typescript@5.9.3)(yaml@2.8.1) + apps/fastmail-eventsource-daemon: + dependencies: + '@prisma/client': + specifier: 6.8.2 + version: 6.8.2(prisma@6.8.2(typescript@5.9.3))(typescript@5.9.3) + '@t3-oss/env-core': + specifier: 0.13.8 + version: 0.13.8(typescript@5.9.3)(valibot@1.1.0(typescript@5.9.3))(zod@3.25.46) + dotenv: + specifier: 17.2.3 + version: 17.2.3 + eventsource: + specifier: 3.0.6 + version: 3.0.6 + zod: + specifier: 3.25.46 + version: 3.25.46 + devDependencies: + '@types/node': + specifier: 24.10.1 + version: 24.10.1 + prisma: + specifier: 6.8.2 + version: 6.8.2(typescript@5.9.3) + tsx: + specifier: 4.21.0 + version: 4.21.0 + typescript: + specifier: 5.9.3 + version: 5.9.3 + apps/unsubscriber: dependencies: '@ai-sdk/amazon-bedrock': @@ -3947,12 +3978,30 @@ packages: typescript: optional: true + '@prisma/client@6.8.2': + resolution: {integrity: sha512-5II+vbyzv4si6Yunwgkj0qT/iY0zyspttoDrL3R4BYgLdp42/d2C8xdi9vqkrYtKt9H32oFIukvyw3Koz5JoDg==} + engines: {node: '>=18.18'} + peerDependencies: + prisma: '*' + typescript: '>=5.1.0' + peerDependenciesMeta: + prisma: + optional: true + typescript: + optional: true + '@prisma/config@6.16.3': resolution: {integrity: sha512-VlsLnG4oOuKGGMToEeVaRhoTBZu5H3q51jTQXb/diRags3WV0+BQK5MolJTtP6G7COlzoXmWeS11rNBtvg+qFQ==} + '@prisma/config@6.8.2': + resolution: {integrity: sha512-ZJY1fF4qRBPdLQ/60wxNtX+eu89c3AkYEcP7L3jkp0IPXCNphCYxikTg55kPJLDOG6P0X+QG5tCv6CmsBRZWFQ==} + '@prisma/debug@6.16.3': resolution: {integrity: sha512-89DdqWtdKd7qoc9/qJCKLTazj3W3zPEiz0hc7HfZdpjzm21c7orOUB5oHWJsG+4KbV4cWU5pefq3CuDVYF9vgA==} + '@prisma/debug@6.8.2': + resolution: {integrity: sha512-4muBSSUwJJ9BYth5N8tqts8JtiLT8QI/RSAzEogwEfpbYGFo9mYsInsVo8dqXdPO2+Rm5OG5q0qWDDE3nyUbVg==} + '@prisma/debug@7.1.0': resolution: {integrity: sha512-pPAckG6etgAsEBusmZiFwM9bldLSNkn++YuC4jCTJACdK5hLOVnOzX7eSL2FgaU6Gomd6wIw21snUX2dYroMZQ==} @@ -3962,15 +4011,27 @@ packages: '@prisma/engines-version@6.16.1-1.bb420e667c1820a8c05a38023385f6cc7ef8e83a': resolution: {integrity: sha512-fftRmosBex48Ph1v2ll1FrPpirwtPZpNkE5CDCY1Lw2SD2ctyrLlVlHiuxDAAlALwWBOkPbAll4+EaqdGuMhJw==} + '@prisma/engines-version@6.8.0-43.2060c79ba17c6bb9f5823312b6f6b7f4a845738e': + resolution: {integrity: sha512-Rkik9lMyHpFNGaLpPF3H5q5TQTkm/aE7DsGM5m92FZTvWQsvmi6Va8On3pWvqLHOt5aPUvFb/FeZTmphI4CPiQ==} + '@prisma/engines@6.16.3': resolution: {integrity: sha512-b+Rl4nzQDcoqe6RIpSHv8f5lLnwdDGvXhHjGDiokObguAAv/O1KaX1Oc69mBW/GFWKQpCkOraobLjU6s1h8HGg==} + '@prisma/engines@6.8.2': + resolution: {integrity: sha512-XqAJ//LXjqYRQ1RRabs79KOY4+v6gZOGzbcwDQl0D6n9WBKjV7qdrbd042CwSK0v0lM9MSHsbcFnU2Yn7z8Zlw==} + '@prisma/fetch-engine@6.16.3': resolution: {integrity: sha512-bUoRIkVaI+CCaVGrSfcKev0/Mk4ateubqWqGZvQ9uCqFv2ENwWIR3OeNuGin96nZn5+SkebcD7RGgKr/+mJelw==} + '@prisma/fetch-engine@6.8.2': + resolution: {integrity: sha512-lCvikWOgaLOfqXGacEKSNeenvj0n3qR5QvZUOmPE2e1Eh8cMYSobxonCg9rqM6FSdTfbpqp9xwhSAOYfNqSW0g==} + '@prisma/get-platform@6.16.3': resolution: {integrity: sha512-X1LxiFXinJ4iQehrodGp0f66Dv6cDL0GbRlcCoLtSu6f4Wi+hgo7eND/afIs5029GQLgNWKZ46vn8hjyXTsHLA==} + '@prisma/get-platform@6.8.2': + resolution: {integrity: sha512-vXSxyUgX3vm1Q70QwzwkjeYfRryIvKno1SXbIqwSptKwqKzskINnDUcx85oX+ys6ooN2ATGSD0xN2UTfg6Zcow==} + '@prisma/instrumentation@6.19.0': resolution: {integrity: sha512-QcuYy25pkXM8BJ37wVFBO7Zh34nyRV1GOb2n3lPkkbRYfl4hWl3PTcImP41P0KrzVXfa/45p6eVCos27x3exIg==} peerDependencies: @@ -8084,6 +8145,10 @@ packages: resolution: {integrity: sha512-IzUmBGPR3+oUG9dUeXynyNmf91/3zUSJg1lCktzKw47OXuhco54U3r9B7O4XX+Rb1Itm9OZ2b0RkTs10bICOxA==} engines: {node: '>=12.0.0'} + eventsource@3.0.6: + resolution: {integrity: sha512-l19WpE2m9hSuyP06+FbuUUf1G+R0SFLrtQfbRb9PRr+oimOfxQhgGCbVaXg5IvZyyTThJsxh6L/srkMiCeBPDA==} + engines: {node: '>=18.0.0'} + eventsource@3.0.7: resolution: {integrity: sha512-CRT1WTyuQoD771GW56XEZFQ/ZoSfWid1alKGDYMmkt2yl8UXrVR4pspqWNEcqKvVIzg6PAltWjxcSSPrboA4iA==} engines: {node: '>=18.0.0'} @@ -10644,6 +10709,16 @@ packages: typescript: optional: true + prisma@6.8.2: + resolution: {integrity: sha512-JNricTXQxzDtRS7lCGGOB4g5DJ91eg3nozdubXze3LpcMl1oWwcFddrj++Up3jnRE6X/3gB/xz3V+ecBk/eEGA==} + engines: {node: '>=18.18'} + hasBin: true + peerDependencies: + typescript: '>=5.1.0' + peerDependenciesMeta: + typescript: + optional: true + prismjs@1.27.0: resolution: {integrity: sha512-t13BGPUlFDR7wRB5kQDG4jjl7XeuH6jbJGt11JHPL96qwsEHNX2+68tFXqc1/k+/jALsbSWJKUOT/hcYAZ5LkA==} engines: {node: '>=6'} @@ -16458,6 +16533,11 @@ snapshots: prisma: 6.16.3(magicast@0.3.5)(typescript@5.9.3) typescript: 5.9.3 + '@prisma/client@6.8.2(prisma@6.8.2(typescript@5.9.3))(typescript@5.9.3)': + optionalDependencies: + prisma: 6.8.2(typescript@5.9.3) + typescript: 5.9.3 + '@prisma/config@6.16.3(magicast@0.3.5)': dependencies: c12: 3.1.0(magicast@0.3.5) @@ -16467,8 +16547,14 @@ snapshots: transitivePeerDependencies: - magicast + '@prisma/config@6.8.2': + dependencies: + jiti: 2.4.2 + '@prisma/debug@6.16.3': {} + '@prisma/debug@6.8.2': {} + '@prisma/debug@7.1.0': {} '@prisma/driver-adapter-utils@7.1.0': @@ -16477,6 +16563,8 @@ snapshots: '@prisma/engines-version@6.16.1-1.bb420e667c1820a8c05a38023385f6cc7ef8e83a': {} + '@prisma/engines-version@6.8.0-43.2060c79ba17c6bb9f5823312b6f6b7f4a845738e': {} + '@prisma/engines@6.16.3': dependencies: '@prisma/debug': 6.16.3 @@ -16484,16 +16572,33 @@ snapshots: '@prisma/fetch-engine': 6.16.3 '@prisma/get-platform': 6.16.3 + '@prisma/engines@6.8.2': + dependencies: + '@prisma/debug': 6.8.2 + '@prisma/engines-version': 6.8.0-43.2060c79ba17c6bb9f5823312b6f6b7f4a845738e + '@prisma/fetch-engine': 6.8.2 + '@prisma/get-platform': 6.8.2 + '@prisma/fetch-engine@6.16.3': dependencies: '@prisma/debug': 6.16.3 '@prisma/engines-version': 6.16.1-1.bb420e667c1820a8c05a38023385f6cc7ef8e83a '@prisma/get-platform': 6.16.3 + '@prisma/fetch-engine@6.8.2': + dependencies: + '@prisma/debug': 6.8.2 + '@prisma/engines-version': 6.8.0-43.2060c79ba17c6bb9f5823312b6f6b7f4a845738e + '@prisma/get-platform': 6.8.2 + '@prisma/get-platform@6.16.3': dependencies: '@prisma/debug': 6.16.3 + '@prisma/get-platform@6.8.2': + dependencies: + '@prisma/debug': 6.8.2 + '@prisma/instrumentation@6.19.0(@opentelemetry/api@1.9.0)': dependencies: '@opentelemetry/api': 1.9.0 @@ -21755,6 +21860,10 @@ snapshots: eventsource@2.0.2: {} + eventsource@3.0.6: + dependencies: + eventsource-parser: 3.0.6 + eventsource@3.0.7: dependencies: eventsource-parser: 3.0.6 @@ -24888,6 +24997,13 @@ snapshots: transitivePeerDependencies: - magicast + prisma@6.8.2(typescript@5.9.3): + dependencies: + '@prisma/config': 6.8.2 + '@prisma/engines': 6.8.2 + optionalDependencies: + typescript: 5.9.3 + prismjs@1.27.0: {} prismjs@1.30.0: {} @@ -27092,7 +27208,7 @@ snapshots: nypm: 0.6.2 trpc-cli: 0.10.2(@inquirer/prompts@7.10.1(@types/node@24.10.1))(typescript@5.9.3) vitest: 3.2.4(@types/debug@4.1.12)(@types/node@24.10.1)(@vitest/ui@3.2.4)(jiti@2.6.1)(jsdom@27.2.0)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.1) - zod: 4.1.12 + zod: 4.1.13 transitivePeerDependencies: - '@edge-runtime/vm' - '@inquirer/prompts'