-
Notifications
You must be signed in to change notification settings - Fork 920
feat(integrations): add Linear integration with bidirectional sync #503
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
81395d8
7d5bd2c
f0b39aa
632ae13
337dcf5
877e116
fdecbf3
c468f93
83f46a4
4ae5445
1062f65
0a1f00a
02aff04
541bd6f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,109 @@ | ||
| import { LinearClient } from "@linear/sdk"; | ||
| import { db } from "@superset/db/client"; | ||
| import { integrationConnections } from "@superset/db/schema"; | ||
| import { Client } from "@upstash/qstash"; | ||
| import { z } from "zod"; | ||
| import { env } from "@/env"; | ||
|
|
||
| const qstash = new Client({ token: env.QSTASH_TOKEN }); | ||
|
|
||
| const stateSchema = z.object({ | ||
| organizationId: z.string().min(1), | ||
| userId: z.string().min(1), | ||
| }); | ||
|
|
||
| export async function GET(request: Request) { | ||
| const url = new URL(request.url); | ||
| const code = url.searchParams.get("code"); | ||
| const state = url.searchParams.get("state"); | ||
| const error = url.searchParams.get("error"); | ||
|
|
||
| if (error) { | ||
| return Response.redirect( | ||
| `${env.NEXT_PUBLIC_WEB_URL}/integrations/linear?error=oauth_denied`, | ||
| ); | ||
| } | ||
|
|
||
| if (!code || !state) { | ||
| return Response.redirect( | ||
| `${env.NEXT_PUBLIC_WEB_URL}/integrations/linear?error=missing_params`, | ||
| ); | ||
| } | ||
|
|
||
| const parsed = stateSchema.safeParse( | ||
| JSON.parse(Buffer.from(state, "base64url").toString("utf-8")), | ||
| ); | ||
|
|
||
| if (!parsed.success) { | ||
| return Response.redirect( | ||
| `${env.NEXT_PUBLIC_WEB_URL}/integrations/linear?error=invalid_state`, | ||
| ); | ||
| } | ||
|
|
||
| const { organizationId, userId } = parsed.data; | ||
|
|
||
| const tokenResponse = await fetch("https://api.linear.app/oauth/token", { | ||
| method: "POST", | ||
| headers: { "Content-Type": "application/x-www-form-urlencoded" }, | ||
| body: new URLSearchParams({ | ||
| grant_type: "authorization_code", | ||
| client_id: env.LINEAR_CLIENT_ID, | ||
| client_secret: env.LINEAR_CLIENT_SECRET, | ||
| redirect_uri: `${env.NEXT_PUBLIC_API_URL}/api/integrations/linear/callback`, | ||
| code, | ||
| }), | ||
| }); | ||
|
|
||
| if (!tokenResponse.ok) { | ||
| return Response.redirect( | ||
| `${env.NEXT_PUBLIC_WEB_URL}/integrations/linear?error=token_exchange_failed`, | ||
| ); | ||
| } | ||
|
|
||
| const tokenData: { access_token: string; expires_in?: number } = | ||
| await tokenResponse.json(); | ||
|
|
||
| const linearClient = new LinearClient({ | ||
| accessToken: tokenData.access_token, | ||
| }); | ||
| const viewer = await linearClient.viewer; | ||
| const linearOrg = await viewer.organization; | ||
|
|
||
| const tokenExpiresAt = tokenData.expires_in | ||
| ? new Date(Date.now() + tokenData.expires_in * 1000) | ||
| : null; | ||
|
|
||
| await db | ||
| .insert(integrationConnections) | ||
| .values({ | ||
| organizationId, | ||
| connectedByUserId: userId, | ||
| provider: "linear", | ||
| accessToken: tokenData.access_token, | ||
| tokenExpiresAt, | ||
| externalOrgId: linearOrg.id, | ||
| externalOrgName: linearOrg.name, | ||
| }) | ||
| .onConflictDoUpdate({ | ||
| target: [ | ||
| integrationConnections.organizationId, | ||
| integrationConnections.provider, | ||
| ], | ||
| set: { | ||
| accessToken: tokenData.access_token, | ||
| tokenExpiresAt, | ||
| externalOrgId: linearOrg.id, | ||
| externalOrgName: linearOrg.name, | ||
| connectedByUserId: userId, | ||
| updatedAt: new Date(), | ||
| }, | ||
| }); | ||
|
|
||
| await qstash.publishJSON({ | ||
| url: `${env.NEXT_PUBLIC_API_URL}/api/integrations/linear/jobs/initial-sync`, | ||
| body: { organizationId, creatorUserId: userId }, | ||
| retries: 3, | ||
| }); | ||
|
|
||
| return Response.redirect(`${env.NEXT_PUBLIC_WEB_URL}/integrations/linear`); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| import { auth } from "@clerk/nextjs/server"; | ||
| import { db } from "@superset/db/client"; | ||
| import { organizationMembers, users } from "@superset/db/schema"; | ||
| import { and, eq } from "drizzle-orm"; | ||
| import { env } from "@/env"; | ||
|
|
||
| export async function GET(request: Request) { | ||
| const { userId: clerkUserId } = await auth(); | ||
|
|
||
| if (!clerkUserId) { | ||
| return Response.json({ error: "Unauthorized" }, { status: 401 }); | ||
| } | ||
|
|
||
| const url = new URL(request.url); | ||
| const organizationId = url.searchParams.get("organizationId"); | ||
|
|
||
| if (!organizationId) { | ||
| return Response.json( | ||
| { error: "Missing organizationId parameter" }, | ||
| { status: 400 }, | ||
| ); | ||
| } | ||
|
|
||
| const user = await db.query.users.findFirst({ | ||
| where: eq(users.clerkId, clerkUserId), | ||
| }); | ||
|
|
||
| if (!user) { | ||
| return Response.json({ error: "User not found" }, { status: 404 }); | ||
| } | ||
|
|
||
| const membership = await db.query.organizationMembers.findFirst({ | ||
| where: and( | ||
| eq(organizationMembers.organizationId, organizationId), | ||
| eq(organizationMembers.userId, user.id), | ||
| ), | ||
| }); | ||
|
|
||
| if (!membership) { | ||
| return Response.json( | ||
| { error: "User is not a member of this organization" }, | ||
| { status: 403 }, | ||
| ); | ||
| } | ||
|
|
||
| const state = Buffer.from( | ||
| JSON.stringify({ organizationId, userId: user.id }), | ||
| ).toString("base64url"); | ||
|
|
||
| const linearAuthUrl = new URL("https://linear.app/oauth/authorize"); | ||
| linearAuthUrl.searchParams.set("client_id", env.LINEAR_CLIENT_ID); | ||
| linearAuthUrl.searchParams.set( | ||
| "redirect_uri", | ||
| `${env.NEXT_PUBLIC_API_URL}/api/integrations/linear/callback`, | ||
| ); | ||
| linearAuthUrl.searchParams.set("response_type", "code"); | ||
| linearAuthUrl.searchParams.set("scope", "read,write,issues:create"); | ||
| linearAuthUrl.searchParams.set("state", state); | ||
|
|
||
| return Response.redirect(linearAuthUrl.toString()); | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,126 @@ | ||||||||||||||||||||||||||||||||
| import { LinearClient } from "@linear/sdk"; | ||||||||||||||||||||||||||||||||
| import { buildConflictUpdateColumns, db } from "@superset/db"; | ||||||||||||||||||||||||||||||||
| import { integrationConnections, tasks, users } from "@superset/db/schema"; | ||||||||||||||||||||||||||||||||
| import { Receiver } from "@upstash/qstash"; | ||||||||||||||||||||||||||||||||
| import { and, eq, inArray } from "drizzle-orm"; | ||||||||||||||||||||||||||||||||
| import chunk from "lodash.chunk"; | ||||||||||||||||||||||||||||||||
| import { z } from "zod"; | ||||||||||||||||||||||||||||||||
| import { env } from "@/env"; | ||||||||||||||||||||||||||||||||
| import { fetchAllIssues, mapIssueToTask } from "./utils"; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| const BATCH_SIZE = 100; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| const receiver = new Receiver({ | ||||||||||||||||||||||||||||||||
| currentSigningKey: env.QSTASH_CURRENT_SIGNING_KEY, | ||||||||||||||||||||||||||||||||
| nextSigningKey: env.QSTASH_NEXT_SIGNING_KEY, | ||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| const payloadSchema = z.object({ | ||||||||||||||||||||||||||||||||
| organizationId: z.string().min(1), | ||||||||||||||||||||||||||||||||
| creatorUserId: z.string().min(1), | ||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| export async function POST(request: Request) { | ||||||||||||||||||||||||||||||||
| const body = await request.text(); | ||||||||||||||||||||||||||||||||
| const signature = request.headers.get("upstash-signature"); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| if (!signature) { | ||||||||||||||||||||||||||||||||
| return Response.json({ error: "Missing signature" }, { status: 401 }); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| const isValid = await receiver.verify({ | ||||||||||||||||||||||||||||||||
| body, | ||||||||||||||||||||||||||||||||
| signature, | ||||||||||||||||||||||||||||||||
| url: `${env.NEXT_PUBLIC_API_URL}/api/integrations/linear/jobs/initial-sync`, | ||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| if (!isValid) { | ||||||||||||||||||||||||||||||||
| return Response.json({ error: "Invalid signature" }, { status: 401 }); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| const parsed = payloadSchema.safeParse(JSON.parse(body)); | ||||||||||||||||||||||||||||||||
| if (!parsed.success) { | ||||||||||||||||||||||||||||||||
| return Response.json({ error: "Invalid payload" }, { status: 400 }); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
Comment on lines
+41
to
+44
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wrap JSON.parse in try-catch to handle malformed payloads. Same issue as in sync-task route - 🔎 Proposed fix- const parsed = payloadSchema.safeParse(JSON.parse(body));
- if (!parsed.success) {
- return Response.json({ error: "Invalid payload" }, { status: 400 });
- }
+ let jsonBody: unknown;
+ try {
+ jsonBody = JSON.parse(body);
+ } catch {
+ return Response.json({ error: "Invalid JSON" }, { status: 400 });
+ }
+
+ const parsed = payloadSchema.safeParse(jsonBody);
+ if (!parsed.success) {
+ return Response.json({ error: "Invalid payload" }, { status: 400 });
+ }📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| const { organizationId, creatorUserId } = parsed.data; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| const connection = await db.query.integrationConnections.findFirst({ | ||||||||||||||||||||||||||||||||
| where: and( | ||||||||||||||||||||||||||||||||
| eq(integrationConnections.organizationId, organizationId), | ||||||||||||||||||||||||||||||||
| eq(integrationConnections.provider, "linear"), | ||||||||||||||||||||||||||||||||
| ), | ||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| if (!connection) { | ||||||||||||||||||||||||||||||||
| return Response.json({ error: "No connection found", skipped: true }); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
Comment on lines
+55
to
+57
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add status code for "No connection found" response. The error response defaults to 200 OK, which is misleading. 🔎 Proposed fix if (!connection) {
- return Response.json({ error: "No connection found", skipped: true });
+ return Response.json({ error: "No connection found", skipped: true }, { status: 404 });
}📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| const client = new LinearClient({ accessToken: connection.accessToken }); | ||||||||||||||||||||||||||||||||
| await performInitialSync(client, organizationId, creatorUserId); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| return Response.json({ success: true }); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| async function performInitialSync( | ||||||||||||||||||||||||||||||||
| client: LinearClient, | ||||||||||||||||||||||||||||||||
| organizationId: string, | ||||||||||||||||||||||||||||||||
| creatorUserId: string, | ||||||||||||||||||||||||||||||||
| ) { | ||||||||||||||||||||||||||||||||
| const issues = await fetchAllIssues(client); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| if (issues.length === 0) { | ||||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| const assigneeEmails = [ | ||||||||||||||||||||||||||||||||
| ...new Set( | ||||||||||||||||||||||||||||||||
| issues.map((i) => i.assignee?.email).filter((e): e is string => !!e), | ||||||||||||||||||||||||||||||||
| ), | ||||||||||||||||||||||||||||||||
| ]; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| const matchedUsers = | ||||||||||||||||||||||||||||||||
| assigneeEmails.length > 0 | ||||||||||||||||||||||||||||||||
| ? await db.query.users.findMany({ | ||||||||||||||||||||||||||||||||
| where: inArray(users.email, assigneeEmails), | ||||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||||
| : []; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| const userByEmail = new Map(matchedUsers.map((u) => [u.email, u.id])); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| const taskValues = issues.map((issue) => | ||||||||||||||||||||||||||||||||
| mapIssueToTask(issue, organizationId, creatorUserId, userByEmail), | ||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| const batches = chunk(taskValues, BATCH_SIZE); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| for (const batch of batches) { | ||||||||||||||||||||||||||||||||
| await db | ||||||||||||||||||||||||||||||||
| .insert(tasks) | ||||||||||||||||||||||||||||||||
| .values(batch) | ||||||||||||||||||||||||||||||||
| .onConflictDoUpdate({ | ||||||||||||||||||||||||||||||||
| target: [tasks.externalProvider, tasks.externalId], | ||||||||||||||||||||||||||||||||
| set: { | ||||||||||||||||||||||||||||||||
| ...buildConflictUpdateColumns(tasks, [ | ||||||||||||||||||||||||||||||||
| "slug", | ||||||||||||||||||||||||||||||||
| "title", | ||||||||||||||||||||||||||||||||
| "description", | ||||||||||||||||||||||||||||||||
| "status", | ||||||||||||||||||||||||||||||||
| "statusColor", | ||||||||||||||||||||||||||||||||
| "statusType", | ||||||||||||||||||||||||||||||||
| "priority", | ||||||||||||||||||||||||||||||||
| "assigneeId", | ||||||||||||||||||||||||||||||||
| "estimate", | ||||||||||||||||||||||||||||||||
| "dueDate", | ||||||||||||||||||||||||||||||||
| "labels", | ||||||||||||||||||||||||||||||||
| "startedAt", | ||||||||||||||||||||||||||||||||
| "completedAt", | ||||||||||||||||||||||||||||||||
| "externalKey", | ||||||||||||||||||||||||||||||||
| "externalUrl", | ||||||||||||||||||||||||||||||||
| "lastSyncedAt", | ||||||||||||||||||||||||||||||||
| ]), | ||||||||||||||||||||||||||||||||
| syncError: null, | ||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrap JSON parsing in try-catch for malformed state.
If the
stateparameter is valid base64url but contains invalid JSON,JSON.parsewill throw an unhandled exception, resulting in a 500 error instead of a graceful redirect to the error page.🔎 Proposed fix
📝 Committable suggestion
🤖 Prompt for AI Agents