Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,8 @@ apps/desktop/resources/bin/win32-x64/
# Streams data
apps/streams/data/

# Wrangler
.wrangler

# Generated by setup.sh
Caddyfile
7 changes: 4 additions & 3 deletions apps/api/src/app/api/electric/[...path]/utils.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { db } from "@superset/db/client";
import {
agentCommands,
apikeys,
devicePresence,
integrationConnections,
invitations,
Expand Down Expand Up @@ -109,8 +108,10 @@ export async function buildWhereClause(
case "agent_commands":
return build(agentCommands, agentCommands.organizationId, organizationId);

case "auth.apikeys":
return build(apikeys, apikeys.userId, userId);
case "auth.apikeys": {
const fragment = `"metadata"::jsonb->>'organizationId' = $1`;
return { fragment, params: [organizationId] };
}

case "integration_connections":
return build(
Expand Down
4 changes: 4 additions & 0 deletions apps/desktop/electron.vite.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ export default defineConfig({
process.env.NEXT_PUBLIC_WEB_URL,
"https://app.superset.sh",
),
"process.env.NEXT_PUBLIC_ELECTRIC_URL": defineEnv(
process.env.NEXT_PUBLIC_ELECTRIC_URL,
"https://electric.superset.sh",
),
"process.env.NEXT_PUBLIC_DOCS_URL": defineEnv(
process.env.NEXT_PUBLIC_DOCS_URL,
"https://docs.superset.sh",
Expand Down
2 changes: 2 additions & 0 deletions apps/desktop/src/renderer/env.renderer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const envSchema = z.object({
.default("development"),
NEXT_PUBLIC_API_URL: z.url().default("https://api.superset.sh"),
NEXT_PUBLIC_WEB_URL: z.url().default("https://app.superset.sh"),
NEXT_PUBLIC_ELECTRIC_URL: z.url().default("https://electric.superset.sh"),
NEXT_PUBLIC_POSTHOG_KEY: z.string().optional(),
NEXT_PUBLIC_POSTHOG_HOST: z.string().default("https://us.i.posthog.com"),
SENTRY_DSN_DESKTOP: z.string().optional(),
Expand All @@ -33,6 +34,7 @@ const rawEnv = {
NODE_ENV: process.env.NODE_ENV,
NEXT_PUBLIC_API_URL: process.env.NEXT_PUBLIC_API_URL,
NEXT_PUBLIC_WEB_URL: process.env.NEXT_PUBLIC_WEB_URL,
NEXT_PUBLIC_ELECTRIC_URL: process.env.NEXT_PUBLIC_ELECTRIC_URL,
NEXT_PUBLIC_POSTHOG_KEY: import.meta.env.NEXT_PUBLIC_POSTHOG_KEY as
| string
| undefined,
Expand Down
4 changes: 2 additions & 2 deletions apps/desktop/src/renderer/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
- default-src 'self': Only allow resources from same origin
- script-src 'self' 'wasm-unsafe-eval' https://*.posthog.com: Allow scripts from same origin + WebAssembly (for xterm ImageAddon) + PostHog
- style-src 'self' 'unsafe-inline': Allow styles from same origin + inline (needed for CSS-in-JS)
- connect-src 'self' ws: wss: %NEXT_PUBLIC_API_URL% %NEXT_PUBLIC_STREAMS_URL% https://*.posthog.com https://*.sentry.io sentry-ipc:: Allow WebSocket + API + Streams server + PostHog + Sentry
- connect-src 'self' ws: wss: %NEXT_PUBLIC_API_URL% %NEXT_PUBLIC_ELECTRIC_URL% %NEXT_PUBLIC_STREAMS_URL% https://*.posthog.com https://*.sentry.io sentry-ipc:: Allow WebSocket + API + Electric worker + Streams server + PostHog + Sentry
- img-src 'self' data: %NEXT_PUBLIC_API_URL% https://*.public.blob.vercel-storage.com https://github.com https://avatars.githubusercontent.com https://models.dev: Allow images from same origin + data URIs + API (Linear image proxy) + Vercel blob storage + GitHub avatars + model provider logos
- font-src 'self': Allow fonts from same origin
-->
<meta http-equiv="Content-Security-Policy" content="default-src 'self'; script-src 'self' 'wasm-unsafe-eval' https://*.posthog.com; style-src 'self' 'unsafe-inline'; connect-src 'self' ws: wss: %NEXT_PUBLIC_API_URL% %NEXT_PUBLIC_STREAMS_URL% https://*.posthog.com https://*.sentry.io sentry-ipc:; img-src 'self' data: %NEXT_PUBLIC_API_URL% https://*.public.blob.vercel-storage.com https://github.com https://avatars.githubusercontent.com https://models.dev; font-src 'self';" />
<meta http-equiv="Content-Security-Policy" content="default-src 'self'; script-src 'self' 'wasm-unsafe-eval' https://*.posthog.com; style-src 'self' 'unsafe-inline'; connect-src 'self' ws: wss: %NEXT_PUBLIC_API_URL% %NEXT_PUBLIC_ELECTRIC_URL% %NEXT_PUBLIC_STREAMS_URL% https://*.posthog.com https://*.sentry.io sentry-ipc:; img-src 'self' data: %NEXT_PUBLIC_API_URL% https://*.public.blob.vercel-storage.com https://github.com https://avatars.githubusercontent.com https://models.dev; font-src 'self';" />
</head>

<body>
Expand Down
2 changes: 2 additions & 0 deletions apps/desktop/src/renderer/lib/auth-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { auth } from "@superset/auth/server";
import {
apiKeyClient,
customSessionClient,
jwtClient,
organizationClient,
} from "better-auth/client/plugins";
import { createAuthClient } from "better-auth/react";
Expand Down Expand Up @@ -31,6 +32,7 @@ export const authClient = createAuthClient({
customSessionClient<typeof auth>(),
stripeClient({ subscription: true }),
apiKeyClient(),
jwtClient(),
],
fetchOptions: {
credentials: "include",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import { electronTrpc } from "../../lib/electron-trpc";
* 1. Load token from disk on mount
* 2. If valid (not expired), set in memory and validate session in background
* 3. Render children immediately without blocking on network
*
* Electric JWT tokens are fetched on-demand via async headers in collections.ts
* using authClient.token() from better-auth's JWT plugin.
*/
export function AuthProvider({ children }: { children: ReactNode }) {
const [isHydrated, setIsHydrated] = useState(false);

// Get session refetch to bust cache when token changes
const { refetch: refetchSession } = authClient.useSession();

// Initial hydration: Load token from disk
const { data: storedToken, isSuccess } =
electronTrpc.auth.getStoredToken.useQuery(undefined, {
refetchOnWindowFocus: false,
Expand All @@ -38,25 +38,21 @@ export function AuthProvider({ children }: { children: ReactNode }) {
setIsHydrated(true);
}, [storedToken, isSuccess, isHydrated, refetchSession]);

// Listen for auth events from main process (new auth or sign-out only, not hydration)
electronTrpc.auth.onTokenChanged.useSubscription(undefined, {
onData: async (data) => {
if (data?.token && data?.expiresAt) {
// New authentication - clear old session state first, then set new token
setAuthToken(null);
await authClient.signOut({ fetchOptions: { throw: false } });
setAuthToken(data.token);
setIsHydrated(true);
refetchSession();
} else if (data === null) {
// Sign-out
setAuthToken(null);
refetchSession();
}
},
});

// Show loading spinner until initial hydration completes
if (!isHydrated) {
return (
<div className="flex h-screen w-screen items-center justify-center bg-background">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,22 @@ import type { Collection } from "@tanstack/react-db";
import { createCollection } from "@tanstack/react-db";
import { createTRPCProxyClient, httpBatchLink } from "@trpc/client";
import { env } from "renderer/env.renderer";
import { getAuthToken } from "renderer/lib/auth-client";
import { authClient, getAuthToken } from "renderer/lib/auth-client";
import superjson from "superjson";
import { z } from "zod";

const columnMapper = snakeCamelMapper();
const electricUrl = `${env.NEXT_PUBLIC_API_URL}/api/electric/v1/shape`;
const electricUrl = `${env.NEXT_PUBLIC_ELECTRIC_URL}/v1/shape`;

const apiKeyDisplaySchema = z.object({
id: z.string(),
name: z.string().nullable(),
start: z.string().nullable(),
createdAt: z.coerce.date(),
lastRequest: z.coerce.date().nullable(),
});

type ApiKeyDisplay = z.infer<typeof apiKeyDisplaySchema>;

interface OrgCollections {
tasks: Collection<SelectTask>;
Expand All @@ -36,6 +46,7 @@ interface OrgCollections {
devicePresence: Collection<SelectDevicePresence>;
integrationConnections: Collection<SelectIntegrationConnection>;
subscriptions: Collection<SelectSubscription>;
apiKeys: Collection<ApiKeyDisplay>;
}

// Per-org collections cache
Expand All @@ -62,37 +73,9 @@ const organizationsCollection = createCollection(
url: electricUrl,
params: { table: "auth.organizations" },
headers: {
Authorization: () => {
const token = getAuthToken();
return token ? `Bearer ${token}` : "";
},
},
columnMapper,
},
getKey: (item) => item.id,
}),
);

const apiKeyDisplaySchema = z.object({
id: z.string(),
name: z.string().nullable(),
start: z.string().nullable(),
createdAt: z.coerce.date(),
lastRequest: z.coerce.date().nullable(),
});

type ApiKeyDisplay = z.infer<typeof apiKeyDisplaySchema>;

const apiKeysCollection = createCollection(
electricCollectionOptions<ApiKeyDisplay>({
id: "apikeys",
shapeOptions: {
url: electricUrl,
params: { table: "auth.apikeys" },
headers: {
Authorization: () => {
const token = getAuthToken();
return token ? `Bearer ${token}` : "";
Authorization: async () => {
const { data } = await authClient.token();
return data?.token ? `Bearer ${data.token}` : "";
},
Comment on lines +76 to 79
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Unhandled rejection from authClient.token() will permanently kill the shape stream.

If authClient.token() rejects (e.g., network failure, auth server unreachable), the async header function throws and the Electric stream has no way to recover. Wrap in try/catch and return "" (or the stale token via getAuthToken()) as a fallback so the stream can still attempt the request and hit the onError retry path.

♻️ Suggested helper extraction (also reduces duplication)
+async function getElectricAuthHeader(): Promise<string> {
+	try {
+		const { data } = await authClient.token();
+		if (data?.token) return `Bearer ${data.token}`;
+	} catch (e) {
+		console.error("[electric/auth] token fetch failed", e);
+	}
+	// Fall back to session bearer token so the request still fires
+	const fallback = getAuthToken();
+	return fallback ? `Bearer ${fallback}` : "";
+}
+
 const organizationsCollection = createCollection(
 	electricCollectionOptions<SelectOrganization>({
 		id: "organizations",
 		shapeOptions: {
 			url: electricUrl,
 			params: { table: "auth.organizations" },
 			headers: {
-				Authorization: async () => {
-					const { data } = await authClient.token();
-					return data?.token ? `Bearer ${data.token}` : "";
-				},
+				Authorization: getElectricAuthHeader,
 			},
 			columnMapper,
 		},
 		getKey: (item) => item.id,
 	}),
 );
 
 function createOrgCollections(organizationId: string): OrgCollections {
 	const headers = {
-		Authorization: async () => {
-			const { data } = await authClient.token();
-			return data?.token ? `Bearer ${data.token}` : "";
-		},
+		Authorization: getElectricAuthHeader,
 	};

Also applies to: 89-92

🤖 Prompt for AI Agents
In
`@apps/desktop/src/renderer/routes/_authenticated/providers/CollectionsProvider/collections.ts`
around lines 76 - 79, The async Authorization header function currently calls
authClient.token() without error handling so a rejection will throw and kill the
Electric stream; wrap the call to authClient.token() in try/catch and on error
return a safe fallback (either an empty string "" or the stale token from
getAuthToken()) instead of letting the exception propagate; update the same
pattern in the other Authorization header occurrence (the second async header
block) so both places use the try/catch fallback to allow the stream to hit its
onError/retry path.

},
columnMapper,
Expand All @@ -103,9 +86,9 @@ const apiKeysCollection = createCollection(

function createOrgCollections(organizationId: string): OrgCollections {
const headers = {
Authorization: () => {
const token = getAuthToken();
return token ? `Bearer ${token}` : "";
Authorization: async () => {
const { data } = await authClient.token();
return data?.token ? `Bearer ${data.token}` : "";
},
};

Expand Down Expand Up @@ -313,6 +296,22 @@ function createOrgCollections(organizationId: string): OrgCollections {
}),
);

const apiKeys = createCollection(
electricCollectionOptions<ApiKeyDisplay>({
id: `apikeys-${organizationId}`,
shapeOptions: {
url: electricUrl,
params: {
table: "auth.apikeys",
organizationId,
},
headers,
columnMapper,
},
getKey: (item) => item.id,
}),
);

return {
tasks,
taskStatuses,
Expand All @@ -324,6 +323,7 @@ function createOrgCollections(organizationId: string): OrgCollections {
devicePresence,
integrationConnections,
subscriptions,
apiKeys,
};
}

Expand All @@ -335,8 +335,7 @@ function createOrgCollections(organizationId: string): OrgCollections {
export async function preloadCollections(
organizationId: string,
): Promise<void> {
const { organizations, apiKeys, ...orgCollections } =
getCollections(organizationId);
const { organizations, ...orgCollections } = getCollections(organizationId);
await Promise.allSettled(
Object.values(orgCollections).map((c) =>
(c as Collection<object>).preload(),
Expand All @@ -363,6 +362,5 @@ export function getCollections(organizationId: string) {
return {
...orgCollections,
organizations: organizationsCollection,
apiKeys: apiKeysCollection,
};
}
5 changes: 5 additions & 0 deletions apps/desktop/vite/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ export function htmlEnvTransformPlugin(): Plugin {
/%NEXT_PUBLIC_API_URL%/g,
process.env.NEXT_PUBLIC_API_URL || "https://api.superset.sh",
)
.replace(
/%NEXT_PUBLIC_ELECTRIC_URL%/g,
process.env.NEXT_PUBLIC_ELECTRIC_URL ||
"https://electric.superset.sh",
)
.replace(
/%NEXT_PUBLIC_STREAMS_URL%/g,
process.env.NEXT_PUBLIC_STREAMS_URL || "https://streams.superset.sh",
Expand Down
20 changes: 20 additions & 0 deletions apps/electric-proxy/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"name": "@superset/electric-proxy",
"version": "0.1.0",
"private": true,
"type": "module",
"scripts": {
"dev": "wrangler dev",
"build": "wrangler deploy --dry-run --outdir=dist",
"deploy": "wrangler deploy",
"typecheck": "tsc --noEmit"
},
"dependencies": {
"jose": "^6.1.3"
},
"devDependencies": {
"@cloudflare/workers-types": "^4.20250214.0",
"typescript": "^5.9.3",
"wrangler": "^4.14.4"
}
}
40 changes: 40 additions & 0 deletions apps/electric-proxy/src/auth.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { createRemoteJWKSet, jwtVerify } from "jose";

interface VerifiedClaims {
organizationIds: string[];
}

let jwks: ReturnType<typeof createRemoteJWKSet> | null = null;

function getJWKS(jwksUrl: string): ReturnType<typeof createRemoteJWKSet> {
if (!jwks) {
jwks = createRemoteJWKSet(new URL(jwksUrl));
}
return jwks;
}

export async function verifyJWT({
token,
jwksUrl,
issuer,
audience,
}: {
token: string;
jwksUrl: string;
issuer: string;
audience: string;
}): Promise<VerifiedClaims> {
const keySet = getJWKS(jwksUrl);

const { payload } = await jwtVerify(token, keySet, {
issuer,
audience,
});

const organizationIds = payload.organizationIds;
if (!Array.isArray(organizationIds)) {
throw new Error("Missing organizationIds claim");
}

return { organizationIds: organizationIds as string[] };
}
Loading
Loading