Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions apps/web/__tests__/ai-choose-args.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ import type { ParsedMessage } from "@/utils/types";
import { getActionItemsWithAiArgs } from "@/utils/ai/choose-rule/choose-args";
import { getEmailAccount, getAction, getRule } from "@/__tests__/helpers";
import { ActionType } from "@prisma/client";
import { createScopedLogger } from "@/utils/logger";

// pnpm test-ai ai-choose-args

const logger = createScopedLogger("test");

const isAiTest = process.env.RUN_AI_TESTS === "true";

const TIMEOUT = 15_000;
Expand All @@ -26,6 +29,7 @@ describe.runIf(isAiTest)("getActionItemsWithAiArgs", () => {
selectedRule: rule,
client: {} as any,
modelType: "default",
logger: logger,
});

expect(result).toEqual(actions);
Expand All @@ -49,6 +53,7 @@ describe.runIf(isAiTest)("getActionItemsWithAiArgs", () => {
selectedRule: rule,
client: {} as any,
modelType: "default",
logger: logger,
});

expect(result).toHaveLength(1);
Expand Down Expand Up @@ -79,6 +84,7 @@ describe.runIf(isAiTest)("getActionItemsWithAiArgs", () => {
selectedRule: rule,
client: {} as any,
modelType: "default",
logger: logger,
});

expect(result).toHaveLength(1);
Expand Down Expand Up @@ -109,6 +115,7 @@ describe.runIf(isAiTest)("getActionItemsWithAiArgs", () => {
selectedRule: rule,
client: {} as any,
modelType: "default",
logger: logger,
});

expect(result).toHaveLength(2);
Expand Down Expand Up @@ -147,6 +154,7 @@ Matt`,
selectedRule: rule,
client: {} as any,
modelType: "default",
logger: logger,
});

expect(result).toHaveLength(2);
Expand Down Expand Up @@ -189,6 +197,7 @@ Matt`,
selectedRule: rule,
client: {} as any,
modelType: "default",
logger: logger,
});

expect(result).toHaveLength(1);
Expand Down
24 changes: 16 additions & 8 deletions apps/web/app/api/chat/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { withEmailAccount } from "@/utils/middleware";
import { getEmailAccountWithAi } from "@/utils/user/get";
import { NextResponse } from "next/server";
import { aiProcessAssistantChat } from "@/utils/ai/assistant/chat";
import { createScopedLogger } from "@/utils/logger";
import type { Logger } from "@/utils/logger";
import prisma from "@/utils/prisma";
import type { Prisma } from "@prisma/client";
import { convertToUIMessages } from "@/components/assistant-chat/helpers";
Expand All @@ -13,8 +13,6 @@ import { messageContextSchema } from "@/app/api/chat/validation";

export const maxDuration = 120;

const logger = createScopedLogger("api/chat");

const textPartSchema = z.object({
text: z.string().min(1).max(3000),
type: z.enum(["text"]),
Expand All @@ -30,7 +28,7 @@ const assistantInputSchema = z.object({
context: messageContextSchema.optional(),
});

export const POST = withEmailAccount(async (request) => {
export const POST = withEmailAccount("chat", async (request) => {
const emailAccountId = request.auth.emailAccountId;

const user = await getEmailAccountWithAi({ emailAccountId });
Expand All @@ -44,7 +42,11 @@ export const POST = withEmailAccount(async (request) => {

const chat =
(await getChatById(data.id)) ||
(await createNewChat({ emailAccountId, chatId: data.id }));
(await createNewChat({
emailAccountId,
chatId: data.id,
logger: request.logger,
}));

if (!chat) {
return NextResponse.json(
Expand Down Expand Up @@ -80,11 +82,11 @@ export const POST = withEmailAccount(async (request) => {

return result.toUIMessageStreamResponse({
onFinish: async ({ messages }) => {
await saveChatMessages(messages, chat.id);
await saveChatMessages(messages, chat.id, request.logger);
},
});
} catch (error) {
logger.error("Error in assistant chat", { error });
request.logger.error("Error in assistant chat", { error });
return NextResponse.json(
{ error: "Error in assistant chat" },
{ status: 500 },
Expand All @@ -95,9 +97,11 @@ export const POST = withEmailAccount(async (request) => {
async function createNewChat({
emailAccountId,
chatId,
logger,
}: {
emailAccountId: string;
chatId: string;
logger: Logger;
}) {
try {
const newChat = await prisma.chat.create({
Expand All @@ -124,7 +128,11 @@ async function saveChatMessage(message: Prisma.ChatMessageCreateInput) {
return prisma.chatMessage.create({ data: message });
}

async function saveChatMessages(messages: UIMessage[], chatId: string) {
async function saveChatMessages(
messages: UIMessage[],
chatId: string,
logger: Logger,
) {
try {
return prisma.chatMessage.createMany({
data: messages.map((message) => ({
Expand Down
19 changes: 11 additions & 8 deletions apps/web/app/api/clean/gmail/route.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
import { type NextRequest, NextResponse } from "next/server";
import { NextResponse } from "next/server";
import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";
import { z } from "zod";
import { withError } from "@/utils/middleware";
import { withError, type RequestWithLogger } from "@/utils/middleware";
import { getGmailClientWithRefresh } from "@/utils/gmail/client";
import { GmailLabel, labelThread } from "@/utils/gmail/label";
import { SafeError } from "@/utils/error";
import prisma from "@/utils/prisma";
import { isDefined } from "@/utils/types";
import { createScopedLogger } from "@/utils/logger";
import type { Logger } from "@/utils/logger";
import { CleanAction } from "@prisma/client";
import { updateThread } from "@/utils/redis/clean";

const logger = createScopedLogger("api/clean/gmail");

const cleanGmailSchema = z.object({
emailAccountId: z.string(),
threadId: z.string(),
Expand All @@ -34,7 +32,8 @@ async function performGmailAction({
processedLabelId,
jobId,
action,
}: CleanGmailBody) {
logger,
}: CleanGmailBody & { logger: Logger }) {
const account = await prisma.emailAccount.findUnique({
where: { id: emailAccountId },
select: {
Expand Down Expand Up @@ -138,11 +137,15 @@ async function saveToDatabase({
}

export const POST = withError(
verifySignatureAppRouter(async (request: NextRequest) => {
"clean/gmail",
verifySignatureAppRouter(async (request: Request) => {
const json = await request.json();
const body = cleanGmailSchema.parse(json);

await performGmailAction(body);
await performGmailAction({
...body,
logger: (request as RequestWithLogger).logger,
});

return NextResponse.json({ success: true });
}),
Expand Down
17 changes: 11 additions & 6 deletions apps/web/app/api/clean/route.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";
import { z } from "zod";
import { NextResponse } from "next/server";
import { withError } from "@/utils/middleware";
import { withError, type RequestWithLogger } from "@/utils/middleware";
import { publishToQstash } from "@/utils/upstash";
import { getThreadMessages } from "@/utils/gmail/thread";
import { getGmailClientWithRefresh } from "@/utils/gmail/client";
import type { CleanGmailBody } from "@/app/api/clean/gmail/route";
import { SafeError } from "@/utils/error";
import { createScopedLogger } from "@/utils/logger";
import type { Logger } from "@/utils/logger";
import { aiClean } from "@/utils/ai/clean/ai-clean";
import { getEmailForLLM } from "@/utils/get-email-from-message";
import {
Expand All @@ -25,8 +25,6 @@ import { CleanAction } from "@prisma/client";
import type { ParsedMessage } from "@/utils/types";
import { isActivePremium } from "@/utils/premium";

const logger = createScopedLogger("api/clean");

const cleanThreadBody = z.object({
emailAccountId: z.string(),
threadId: z.string(),
Expand Down Expand Up @@ -56,7 +54,8 @@ async function cleanThread({
action,
instructions,
skips,
}: CleanThreadBody) {
logger,
}: CleanThreadBody & { logger: Logger }) {
// 1. get thread with messages
// 2. process thread with ai / fixed logic
// 3. add to gmail action queue
Expand Down Expand Up @@ -112,6 +111,7 @@ async function cleanThread({
processedLabelId,
jobId,
action,
logger,
});

function isStarred(message: ParsedMessage) {
Expand Down Expand Up @@ -234,13 +234,15 @@ function getPublish({
processedLabelId,
jobId,
action,
logger,
}: {
emailAccountId: string;
threadId: string;
markedDoneLabelId: string;
processedLabelId: string;
jobId: string;
action: CleanAction;
logger: Logger;
}) {
return async ({ markDone }: { markDone: boolean }) => {
// max rate:
Expand Down Expand Up @@ -296,7 +298,10 @@ export const POST = withError(
const json = await request.json();
const body = cleanThreadBody.parse(json);

await cleanThread(body);
await cleanThread({
...body,
logger: (request as RequestWithLogger).logger,
});

return NextResponse.json({ success: true });
}),
Expand Down
1 change: 1 addition & 0 deletions apps/web/app/api/google/watch/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export const GET = withAuth("google/watch", async (request) => {
const emailProvider = await createEmailProvider({
emailAccountId,
provider: "google",
logger: request.logger,
});
const expirationDate = await watchEmails({
emailAccountId,
Expand Down
1 change: 1 addition & 0 deletions apps/web/app/api/google/webhook/process-history-item.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export async function processHistoryItem(
const provider = await createEmailProvider({
emailAccountId,
provider: "google",
logger,
});

// Handle Google-specific label events
Expand Down
1 change: 1 addition & 0 deletions apps/web/app/api/outlook/webhook/process-history.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ export async function processHistoryForUser({
const provider = await createEmailProvider({
emailAccountId: validatedEmailAccount.id,
provider: accountProvider,
logger,
});

try {
Expand Down
1 change: 1 addition & 0 deletions apps/web/app/api/resend/digest/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ async function sendEmail({
const emailProvider = await createEmailProvider({
emailAccountId,
provider: emailAccount.account.provider,
logger,
});

const digestScheduleData = await getDigestSchedule({ emailAccountId });
Expand Down
2 changes: 2 additions & 0 deletions apps/web/app/api/scheduled-actions/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,12 @@ export const POST = verifySignatureAppRouter(
const provider = await createEmailProvider({
emailAccountId: scheduledAction.emailAccountId,
provider: scheduledAction.emailAccount.account.provider,
logger,
});
const executionResult = await executeScheduledAction(
scheduledAction,
provider,
logger,
);

if (executionResult.success) {
Expand Down
5 changes: 5 additions & 0 deletions apps/web/app/api/user/no-reply/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,25 @@ import { NextResponse } from "next/server";
import { isDefined } from "@/utils/types";
import { withEmailProvider } from "@/utils/middleware";
import { createEmailProvider } from "@/utils/email/provider";
import type { Logger } from "@/utils/logger";

export type NoReplyResponse = Awaited<ReturnType<typeof getNoReply>>;

async function getNoReply({
emailAccountId,
userEmail,
provider,
logger,
}: {
emailAccountId: string;
userEmail: string;
provider: string;
logger: Logger;
}) {
const emailProvider = await createEmailProvider({
emailAccountId,
provider,
logger,
});

const sentEmails = await emailProvider.getSentMessages(50);
Expand Down Expand Up @@ -53,6 +57,7 @@ export const GET = withEmailProvider("user/no-reply", async (request) => {
emailAccountId,
userEmail,
provider: request.emailProvider.name,
logger: request.logger,
});

return NextResponse.json(result);
Expand Down
7 changes: 5 additions & 2 deletions apps/web/app/api/user/rules/[id]/example/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import { fetchPaginatedMessages } from "@/app/api/user/group/[groupId]/messages/
import { isGroupRule, isAIRule, isStaticRule } from "@/utils/condition";
import { LogicalOperator } from "@prisma/client";
import type { EmailProvider } from "@/utils/email/types";
import type { Logger } from "@/utils/logger";

export async function fetchExampleMessages(
rule: RuleWithGroup,
emailProvider: EmailProvider,
logger: Logger,
) {
const isStatic = isStaticRule(rule);
const isGroup = isGroupRule(rule);
Expand All @@ -32,7 +34,7 @@ export async function fetchExampleMessages(
)
return [];

if (isStatic) return fetchStaticExampleMessages(rule, emailProvider);
if (isStatic) return fetchStaticExampleMessages(rule, emailProvider, logger);

if (isGroup) {
if (!rule.group) return [];
Expand All @@ -50,6 +52,7 @@ export async function fetchExampleMessages(
async function fetchStaticExampleMessages(
rule: RuleWithGroup,
emailProvider: EmailProvider,
logger: Logger,
): Promise<MessageWithGroupItem[]> {
// Build structured query options instead of provider-specific query strings
const options: Parameters<EmailProvider["getMessagesByFields"]>[0] = {
Expand All @@ -70,6 +73,6 @@ async function fetchStaticExampleMessages(

// search might include messages that don't match the rule, so we filter those out
return response.messages.filter((message) =>
matchesStaticRule(rule, message),
matchesStaticRule(rule, message, logger),
);
}
Loading
Loading