Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions apps/api/src/app/api/integrations/linear/callback/route.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { LinearClient } from "@linear/sdk";
import { db } from "@superset/db/client";
import { integrationConnections, members } from "@superset/db/schema";
import { linearTokenResponseSchema } from "@superset/trpc/integrations/linear";
import { Client } from "@upstash/qstash";
import { and, eq } from "drizzle-orm";

Expand Down Expand Up @@ -73,18 +74,15 @@ export async function GET(request: Request) {
);
}

const tokenData: { access_token: string; expires_in?: number } =
await tokenResponse.json();
const tokenData = linearTokenResponseSchema.parse(await tokenResponse.json());

const linearClient = new LinearClient({
accessToken: tokenData.access_token,
});
const viewer = await linearClient.viewer;
const linearOrg = await viewer.organization;

const tokenExpiresAt = tokenData.expires_in
? new Date(Date.now() + tokenData.expires_in * 1000)
: null;
const tokenExpiresAt = new Date(Date.now() + tokenData.expires_in * 1000);

await db
.insert(integrationConnections)
Expand All @@ -93,6 +91,7 @@ export async function GET(request: Request) {
connectedByUserId: userId,
provider: "linear",
accessToken: tokenData.access_token,
refreshToken: tokenData.refresh_token,
tokenExpiresAt,
externalOrgId: linearOrg.id,
externalOrgName: linearOrg.name,
Expand All @@ -104,7 +103,10 @@ export async function GET(request: Request) {
],
set: {
accessToken: tokenData.access_token,
refreshToken: tokenData.refresh_token,
tokenExpiresAt,
disconnectedAt: null,
disconnectReason: null,
externalOrgId: linearOrg.id,
externalOrgName: linearOrg.name,
connectedByUserId: userId,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
import { LinearClient } from "@linear/sdk";
import type { LinearClient } from "@linear/sdk";
import { buildConflictUpdateColumns, db } from "@superset/db";
import {
integrationConnections,
members,
taskStatuses,
tasks,
users,
} from "@superset/db/schema";
import { members, taskStatuses, tasks, users } from "@superset/db/schema";
import { getLinearClient } from "@superset/trpc/integrations/linear";
import { Receiver } from "@upstash/qstash";
import { and, eq, inArray, isNull } from "drizzle-orm";
import chunk from "lodash.chunk";
Expand Down Expand Up @@ -57,18 +52,14 @@ export async function POST(request: Request) {

const { organizationId, creatorUserId } = parsed.data;

const connection = await db.query.integrationConnections.findFirst({
where: and(
eq(integrationConnections.organizationId, organizationId),
eq(integrationConnections.provider, "linear"),
),
});

if (!connection) {
return Response.json({ error: "No connection found", skipped: true });
const client = await getLinearClient(organizationId);
if (!client) {
return Response.json({
error: "No Linear connection or connection disconnected",
skipped: true,
});
}

const client = new LinearClient({ accessToken: connection.accessToken });
await performInitialSync(client, organizationId, creatorUserId);

return Response.json({ success: true });
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { db } from "@superset/db/client";
import { integrationConnections } from "@superset/db/schema";
import { refreshLinearToken } from "@superset/trpc/integrations/linear";
import { Receiver } from "@upstash/qstash";
import { and, eq, isNotNull, isNull, lt, sql } from "drizzle-orm";
import { env } from "@/env";

const receiver = new Receiver({
currentSigningKey: env.QSTASH_CURRENT_SIGNING_KEY,
nextSigningKey: env.QSTASH_NEXT_SIGNING_KEY,
});

export async function POST(request: Request) {
const body = await request.text();
const signature = request.headers.get("upstash-signature");

const isDev = env.NODE_ENV === "development";

if (!isDev) {
if (!signature) {
return Response.json({ error: "Missing signature" }, { status: 401 });
}

try {
const isValid = await receiver.verify({
body,
signature,
url: `${env.NEXT_PUBLIC_API_URL}/api/integrations/linear/jobs/refresh-tokens`,
});

if (!isValid) {
return Response.json({ error: "Invalid signature" }, { status: 401 });
}
} catch (verifyError) {
console.error(
"[linear-refresh-cron] Signature verification failed:",
verifyError,
);
return Response.json(
{ error: "Signature verification failed" },
{ status: 401 },
);
}
}

const stale = await db.query.integrationConnections.findMany({
where: and(
eq(integrationConnections.provider, "linear"),
isNull(integrationConnections.disconnectedAt),
isNotNull(integrationConnections.refreshToken),
lt(
integrationConnections.tokenExpiresAt,
sql`now() + interval '90 minutes'`,
),
),
columns: { id: true },
});

const results = await Promise.allSettled(
stale.map(async (connection) => {
try {
await refreshLinearToken(connection.id);
return { id: connection.id, ok: true };
} catch (error) {
console.error(
`[linear-refresh-cron] failed for ${connection.id}:`,
error,
);
return { id: connection.id, ok: false };
}
}),
);
Comment on lines +59 to +72
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Cap the cron's refresh fan-out.

Promise.allSettled(stale.map(...)) refreshes every candidate at once. Since this route is the primary refresh path, a larger tenant set will stampede Linear's token endpoint and your advisory-lock/DB path on the same tick. Batch or limit concurrency so the hourly run stays predictable.

Suggested change
+const REFRESH_CONCURRENCY = 20;
+
-	const results = await Promise.allSettled(
-		stale.map(async (connection) => {
-			try {
-				await refreshLinearToken(connection.id);
-				return { id: connection.id, ok: true };
-			} catch (error) {
-				console.error(
-					`[linear-refresh-cron] failed for ${connection.id}:`,
-					error,
-				);
-				return { id: connection.id, ok: false };
-			}
-		}),
-	);
+	const results = [];
+	for (let i = 0; i < stale.length; i += REFRESH_CONCURRENCY) {
+		const batch = stale.slice(i, i + REFRESH_CONCURRENCY);
+		results.push(
+			...(await Promise.allSettled(
+				batch.map(async (connection) => {
+					try {
+						await refreshLinearToken(connection.id);
+						return { id: connection.id, ok: true };
+					} catch (error) {
+						console.error(
+							`[linear-refresh-cron] failed for ${connection.id}:`,
+							error,
+						);
+						return { id: connection.id, ok: false };
+					}
+				}),
+			)),
+		);
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const results = await Promise.allSettled(
stale.map(async (connection) => {
try {
await refreshLinearToken(connection.id);
return { id: connection.id, ok: true };
} catch (error) {
console.error(
`[linear-refresh-cron] failed for ${connection.id}:`,
error,
);
return { id: connection.id, ok: false };
}
}),
);
const REFRESH_CONCURRENCY = 20;
const results = [];
for (let i = 0; i < stale.length; i += REFRESH_CONCURRENCY) {
const batch = stale.slice(i, i + REFRESH_CONCURRENCY);
results.push(
...(await Promise.allSettled(
batch.map(async (connection) => {
try {
await refreshLinearToken(connection.id);
return { id: connection.id, ok: true };
} catch (error) {
console.error(
`[linear-refresh-cron] failed for ${connection.id}:`,
error,
);
return { id: connection.id, ok: false };
}
}),
)),
);
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/api/src/app/api/integrations/linear/jobs/refresh-tokens/route.ts` around
lines 48 - 61, The current Promise.allSettled over stale.map causes an
all-at-once fan-out when calling refreshLinearToken for each connection; change
it to limit concurrency (e.g., process in batches or use a concurrency limiter)
so the cron does not stampede Linear or DB locks. Modify the logic around
Promise.allSettled / stale.map and results to iterate over stale in controlled
chunks or use a concurrency utility to call refreshLinearToken(connection.id)
with a max concurrent workers value (e.g., 5-10), ensuring the same
per-connection try/catch and the returned shape { id, ok } is preserved so
downstream code using results remains unchanged.


const succeeded = results.filter(
(result) => result.status === "fulfilled" && result.value.ok,
).length;

return Response.json({ candidates: stale.length, succeeded });
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,21 @@ import {
} from "@superset/ui/alert-dialog";
import { Button } from "@superset/ui/button";
import { useMutation, useQueryClient } from "@tanstack/react-query";
import { Unplug } from "lucide-react";
import { AlertTriangle, Unplug } from "lucide-react";
import { useRouter } from "next/navigation";
import { env } from "@/env";
import { useTRPC } from "@/trpc/react";

interface ConnectionControlsProps {
organizationId: string;
isConnected: boolean;
needsReconnect?: boolean;
}

export function ConnectionControls({
organizationId,
isConnected,
needsReconnect = false,
}: ConnectionControlsProps) {
const trpc = useTRPC();
const router = useRouter();
Expand All @@ -52,6 +54,47 @@ export function ConnectionControls({
disconnectMutation.mutate({ organizationId });
};

if (isConnected && needsReconnect) {
return (
<div className="space-y-3">
<div className="flex items-start gap-2 rounded-md border border-destructive/30 bg-destructive/10 p-3 text-sm text-destructive">
<AlertTriangle className="mt-0.5 size-4 shrink-0" />
<div>Linear authorization expired. Reconnect to resume syncing.</div>
</div>
<div className="flex gap-2">
<Button variant="destructive" onClick={handleConnect}>
Reconnect Linear
</Button>
<AlertDialog>
<AlertDialogTrigger asChild>
<Button variant="outline" disabled={disconnectMutation.isPending}>
<Unplug className="mr-2 size-4" />
{disconnectMutation.isPending
? "Disconnecting..."
: "Disconnect"}
</Button>
</AlertDialogTrigger>
<AlertDialogContent>
<AlertDialogHeader>
<AlertDialogTitle>Disconnect Linear?</AlertDialogTitle>
<AlertDialogDescription>
This will remove the connection between your organization and
Linear. You can reconnect at any time.
</AlertDialogDescription>
</AlertDialogHeader>
<AlertDialogFooter>
<AlertDialogCancel>Cancel</AlertDialogCancel>
<AlertDialogAction onClick={handleDisconnect}>
Disconnect
</AlertDialogAction>
</AlertDialogFooter>
</AlertDialogContent>
</AlertDialog>
</div>
</div>
);
}

if (isConnected) {
return (
<AlertDialog>
Expand Down
11 changes: 9 additions & 2 deletions apps/web/src/app/(dashboard-legacy)/integrations/linear/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
CardHeader,
CardTitle,
} from "@superset/ui/card";
import { ArrowLeft, CheckCircle2 } from "lucide-react";
import { AlertTriangle, ArrowLeft, CheckCircle2 } from "lucide-react";
import Link from "next/link";
import { SiLinear } from "react-icons/si";
import { api } from "@/trpc/server";
Expand All @@ -32,6 +32,7 @@ export default async function LinearIntegrationPage() {
organizationId: organization.id,
});
const isConnected = !!connection;
const needsReconnect = !!connection?.needsReconnect;

return (
<div className="space-y-8">
Expand All @@ -52,7 +53,12 @@ export default async function LinearIntegrationPage() {
<div className="flex-1">
<div className="flex items-center gap-3">
<h1 className="text-2xl font-semibold">Linear</h1>
{isConnected ? (
{needsReconnect ? (
<Badge variant="destructive" className="gap-1">
<AlertTriangle className="size-3" />
Reconnect required
</Badge>
) : isConnected ? (
<Badge variant="default" className="gap-1">
<CheckCircle2 className="size-3" />
Connected
Expand All @@ -79,6 +85,7 @@ export default async function LinearIntegrationPage() {
<ConnectionControls
organizationId={organization.id}
isConnected={isConnected}
needsReconnect={needsReconnect}
/>
</CardContent>
</Card>
Expand Down
2 changes: 2 additions & 0 deletions packages/db/drizzle/0042_linear_disconnect_state.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE "integration_connections" ADD COLUMN "disconnected_at" timestamp;--> statement-breakpoint
ALTER TABLE "integration_connections" ADD COLUMN "disconnect_reason" text;
Comment on lines +1 to +2
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Regenerate this migration instead of hand-editing it.

packages/db/drizzle/* is supposed to be generated from the schema files. Checking in manual SQL here can drift from Drizzle’s journal/snapshot state and makes the next generate unreliable.

As per coding guidelines, "Create database migrations by modifying schema files in packages/db/src/schema/ and running bunx drizzle-kit generate --name=\"<sample_name_snake_case>\". Never manually edit migration files in packages/db/drizzle/, including .sql files, meta/_journal.json, and snapshot files."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/db/drizzle/0042_linear_disconnect_state.sql` around lines 1 - 2,
This migration was hand-edited; instead update the schema for the
"integration_connections" table (add the disconnected_at timestamp and
disconnect_reason text columns) under packages/db/src/schema/ and then
regenerate the migration rather than editing
packages/db/drizzle/0042_linear_disconnect_state.sql directly by running bunx
drizzle-kit generate --name="<descriptive_snake_case_name>"; commit the
generated migration so Drizzle's journal/snapshot remain in sync and remove the
manual SQL edit.

Loading
Loading