Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions apps/web/app/api/chats/[chatId]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,24 @@ import { withEmailAccount } from "@/utils/middleware";

export type GetChatResponse = Awaited<ReturnType<typeof getChat>>;

export const GET = withEmailAccount(async (request, { params }) => {
const { emailAccountId } = request.auth;
const { chatId } = await params;
export const GET = withEmailAccount(
"chats/detail",
async (request, { params }) => {
const { emailAccountId } = request.auth;
const { chatId } = await params;

if (!chatId) {
return NextResponse.json(
{ error: "Chat ID is required." },
{ status: 400 },
);
}
if (!chatId) {
return NextResponse.json(
{ error: "Chat ID is required." },
{ status: 400 },
);
}

const chat = await getChat({ chatId, emailAccountId });
const chat = await getChat({ chatId, emailAccountId });

return NextResponse.json(chat);
});
return NextResponse.json(chat);
},
);

async function getChat({
chatId,
Expand Down
2 changes: 1 addition & 1 deletion apps/web/app/api/chats/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { withEmailAccount } from "@/utils/middleware";

export type GetChatsResponse = Awaited<ReturnType<typeof getChats>>;

export const GET = withEmailAccount(async (request) => {
export const GET = withEmailAccount("chats", async (request) => {
const emailAccountId = request.auth.emailAccountId;
const result = await getChats({ emailAccountId });
return NextResponse.json(result);
Expand Down
2 changes: 1 addition & 1 deletion apps/web/app/api/clean/history/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ async function getCleanHistory({ emailAccountId }: { emailAccountId: string }) {
return { result };
}

export const GET = withEmailAccount(async (request) => {
export const GET = withEmailAccount("clean/history", async (request) => {
const emailAccountId = request.auth.emailAccountId;

const result = await getCleanHistory({ emailAccountId });
Expand Down
26 changes: 15 additions & 11 deletions apps/web/app/api/email-stream/route.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
import { createScopedLogger } from "@/utils/logger";
import { RedisSubscriber } from "@/utils/redis/subscriber";
import { withAuth } from "@/utils/middleware";
import { NextResponse } from "next/server";
import { getEmailAccount } from "@/utils/redis/account-validation";

export const maxDuration = 300;

const logger = createScopedLogger("email-stream");

// 5 minutes in milliseconds
const INACTIVITY_TIMEOUT = 5 * 60 * 1000;

export const GET = withAuth(async (request) => {
export const GET = withAuth("email-stream", async (request) => {
const { userId } = request.auth;

const url = new URL(request.url);
const emailAccountId = url.searchParams.get("emailAccountId");

if (!emailAccountId) {
logger.warn("Bad Request: Email Account ID missing from query parameters.");
request.logger.warn(
"Bad Request: Email Account ID missing from query parameters.",
);
return NextResponse.json(
{ error: "Email account ID is required" },
{ status: 400 },
Expand All @@ -30,7 +29,7 @@ export const GET = withAuth(async (request) => {
if (!email)
return NextResponse.json({ error: "Invalid account ID" }, { status: 403 });

logger.info("Processing GET request for email stream", {
request.logger.info("Processing GET request for email stream", {
userId,
emailAccountId,
});
Expand All @@ -39,7 +38,8 @@ export const GET = withAuth(async (request) => {
const redisSubscriber = RedisSubscriber.getInstance();

redisSubscriber.psubscribe(pattern, (err) => {
if (err) logger.error("Error subscribing to threads", { error: err });
if (err)
request.logger.error("Error subscribing to threads", { error: err });
});

// Set headers for SSE
Expand All @@ -51,7 +51,7 @@ export const GET = withAuth(async (request) => {
"X-Accel-Buffering": "no", // For anyone using Nginx
});

logger.info("Creating SSE stream", { emailAccountId });
request.logger.info("Creating SSE stream", { emailAccountId });

const encoder = new TextEncoder();

Expand All @@ -64,7 +64,9 @@ export const GET = withAuth(async (request) => {
const resetInactivityTimer = () => {
if (inactivityTimer) clearTimeout(inactivityTimer);
inactivityTimer = setTimeout(() => {
logger.info("Stream closed due to inactivity", { emailAccountId });
request.logger.info("Stream closed due to inactivity", {
emailAccountId,
});
if (!isControllerClosed) {
isControllerClosed = true;
controller.close();
Expand All @@ -85,7 +87,7 @@ export const GET = withAuth(async (request) => {
);
resetInactivityTimer(); // Reset timer on message
} catch (error) {
logger.error("Error enqueueing message", { error });
request.logger.error("Error enqueueing message", { error });
// If we hit an error, mark controller as closed and clean up
isControllerClosed = true;
redisSubscriber.punsubscribe(pattern);
Expand All @@ -94,7 +96,9 @@ export const GET = withAuth(async (request) => {
});

request.signal.addEventListener("abort", () => {
logger.info("Cleaning up Redis subscription", { emailAccountId });
request.logger.info("Cleaning up Redis subscription", {
emailAccountId,
});
clearTimeout(inactivityTimer);
if (!isControllerClosed) {
isControllerClosed = true;
Expand Down
33 changes: 18 additions & 15 deletions apps/web/app/api/google/calendar/auth-url/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,21 @@ const getAuthUrl = ({ emailAccountId }: { emailAccountId: string }) => {
return { url, state };
};

export const GET = withEmailAccount(async (request) => {
const { emailAccountId } = request.auth;
const { url, state } = getAuthUrl({ emailAccountId });

const res: GetCalendarAuthUrlResponse = { url };
const response = NextResponse.json(res);

response.cookies.set(
CALENDAR_STATE_COOKIE_NAME,
state,
oauthStateCookieOptions,
);

return response;
});
export const GET = withEmailAccount(
"google/calendar/auth-url",
async (request) => {
const { emailAccountId } = request.auth;
const { url, state } = getAuthUrl({ emailAccountId });

const res: GetCalendarAuthUrlResponse = { url };
const response = NextResponse.json(res);

response.cookies.set(
CALENDAR_STATE_COOKIE_NAME,
state,
oauthStateCookieOptions,
);

return response;
},
);
2 changes: 1 addition & 1 deletion apps/web/app/api/google/contacts/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async function getContacts(client: people_v1.People, query: string) {
return { result };
}

export const GET = withEmailAccount(async (request) => {
export const GET = withEmailAccount("google/contacts", async (request) => {
if (!env.NEXT_PUBLIC_CONTACTS_ENABLED)
return NextResponse.json({ error: "Contacts API not enabled" });

Expand Down
2 changes: 1 addition & 1 deletion apps/web/app/api/google/linking/auth-url/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const getAuthUrl = ({ userId }: { userId: string }) => {
return { url, state };
};

export const GET = withAuth(async (request) => {
export const GET = withAuth("google/linking/auth-url", async (request) => {
const userId = request.auth.userId;
const { url, state } = getAuthUrl({ userId });

Expand Down
2 changes: 1 addition & 1 deletion apps/web/app/api/google/watch/all/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ async function watchAllEmails() {
return NextResponse.json({ success: true });
}

export const GET = withError(async (request) => {
export const GET = withError("google/watch/all", async (request) => {
if (!hasCronSecret(request)) {
captureException(
new Error("Unauthorized cron request: api/google/watch/all"),
Expand Down
11 changes: 5 additions & 6 deletions apps/web/app/api/google/watch/route.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
import { NextResponse } from "next/server";
import { watchEmails } from "./controller";
import { withAuth } from "@/utils/middleware";
import { createScopedLogger } from "@/utils/logger";
import prisma from "@/utils/prisma";
import { createEmailProvider } from "@/utils/email/provider";

export const dynamic = "force-dynamic";

const logger = createScopedLogger("api/google/watch");

export const GET = withAuth(async (request) => {
export const GET = withAuth("google/watch", async (request) => {
const userId = request.auth.userId;
const results = [];

Expand Down Expand Up @@ -43,15 +40,17 @@ export const GET = withAuth(async (request) => {
expirationDate,
});
} else {
logger.error("Error watching inbox for account", { emailAccountId });
request.logger.error("Error watching inbox for account", {
emailAccountId,
});
results.push({
emailAccountId,
status: "error",
message: "Failed to set up watch for this account.",
});
}
} catch (error) {
logger.error("Exception while watching inbox for account", {
request.logger.error("Exception while watching inbox for account", {
emailAccountId,
error,
});
Expand Down
2 changes: 1 addition & 1 deletion apps/web/app/api/knowledge/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export type GetKnowledgeResponse = {
items: Knowledge[];
};

export const GET = withEmailAccount(async (request) => {
export const GET = withEmailAccount("knowledge", async (request) => {
const emailAccountId = request.auth.emailAccountId;
const items = await prisma.knowledge.findMany({
where: { emailAccountId },
Expand Down
7 changes: 2 additions & 5 deletions apps/web/app/api/labels/route.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import { NextResponse } from "next/server";
import { withEmailProvider } from "@/utils/middleware";
import { createScopedLogger } from "@/utils/logger";

const logger = createScopedLogger("labels");

export type UnifiedLabel = {
id: string;
Expand All @@ -23,7 +20,7 @@ export type LabelsResponse = {
export const dynamic = "force-dynamic";
export const maxDuration = 30;

export const GET = withEmailProvider(async (request) => {
export const GET = withEmailProvider("labels", async (request) => {
const { emailProvider } = request;

try {
Expand All @@ -39,7 +36,7 @@ export const GET = withEmailProvider(async (request) => {
}));
return NextResponse.json({ labels: unifiedLabels });
} catch (error) {
logger.error("Error fetching labels", { error });
request.logger.error("Error fetching labels", { error });
return NextResponse.json({ labels: [] }, { status: 500 });
}
});
91 changes: 46 additions & 45 deletions apps/web/app/api/mcp/[integration]/auth-url/route.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { NextResponse } from "next/server";
import { env } from "@/env";
import { withEmailAccount } from "@/utils/middleware";
import { createScopedLogger } from "@/utils/logger";
import { SafeError } from "@/utils/error";
import {
oauthStateCookieOptions,
Expand All @@ -15,59 +14,61 @@ import { generateOAuthUrl } from "@/utils/mcp/oauth";

export type GetMcpAuthUrlResponse = { url: string };

export const GET = withEmailAccount(async (request, { params }) => {
const { integration } = await params;
const { emailAccountId } = request.auth;
const userId = request.auth.userId;
export const GET = withEmailAccount(
"mcp/auth-url",
async (request, { params }) => {
const { integration } = await params;
const { emailAccountId } = request.auth;
const userId = request.auth.userId;

const logger = createScopedLogger("mcp/auth-url").with({
userId,
integration,
});
const logger = request.logger.with({
integration,
});

const integrationConfig = getIntegration(integration);
const integrationConfig = getIntegration(integration);

if (!integrationConfig) {
throw new SafeError(`Integration ${integration} not found`);
}
if (!integrationConfig) {
throw new SafeError(`Integration ${integration} not found`);
}

if (integrationConfig.authType !== "oauth") {
throw new SafeError(`Integration ${integration} does not support OAuth`);
}
if (integrationConfig.authType !== "oauth") {
throw new SafeError(`Integration ${integration} does not support OAuth`);
}

try {
const redirectUri = `${env.NEXT_PUBLIC_BASE_URL}/api/mcp/${integration}/callback`;
try {
const redirectUri = `${env.NEXT_PUBLIC_BASE_URL}/api/mcp/${integration}/callback`;

const state = generateOAuthState({
userId,
emailAccountId,
type: getMcpOAuthStateType(integration),
});
const state = generateOAuthState({
userId,
emailAccountId,
type: getMcpOAuthStateType(integration),
});

const { url, codeVerifier } = await generateOAuthUrl({
integration,
redirectUri,
state,
});
const { url, codeVerifier } = await generateOAuthUrl({
integration,
redirectUri,
state,
});

// Set secure cookies for state and PKCE verifier
const response = NextResponse.json<GetMcpAuthUrlResponse>({ url });
// Set secure cookies for state and PKCE verifier
const response = NextResponse.json<GetMcpAuthUrlResponse>({ url });

const maxAge = 60 * 10; // 10 minutes
const maxAge = 60 * 10; // 10 minutes

response.cookies.set(getMcpStateCookieName(integration), state, {
...oauthStateCookieOptions,
maxAge,
});
response.cookies.set(getMcpStateCookieName(integration), state, {
...oauthStateCookieOptions,
maxAge,
});

response.cookies.set(getMcpPkceCookieName(integration), codeVerifier, {
...oauthStateCookieOptions,
maxAge,
});
response.cookies.set(getMcpPkceCookieName(integration), codeVerifier, {
...oauthStateCookieOptions,
maxAge,
});

return response;
} catch (error) {
logger.error("Failed to generate MCP auth URL", { error });
throw new SafeError("Failed to generate authorization URL");
}
});
return response;
} catch (error) {
logger.error("Failed to generate MCP auth URL", { error });
throw new SafeError("Failed to generate authorization URL");
}
},
);
Loading
Loading