Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,14 @@ FREESTYLE_API_KEY=
# -----------------------------------------------------------------------------
# Streams (AI Chat Server)
# -----------------------------------------------------------------------------
# Desktop app / client-facing
# Clients (Desktop Web Mobile)
STREAMS_URL=http://localhost:8080
STREAMS_SECRET=

# Streams server internals
# Streams server internals (optional)
STREAMS_PORT=8080
STREAMS_INTERNAL_PORT=8081
STREAMS_INTERNAL_URL=http://127.0.0.1:8081
STREAMS_DATA_DIR=.data
STREAMS_INTERNAL_URL=http://localhost:8081
STREAMS_DATA_DIR=./data

# -----------------------------------------------------------------------------
# Sentry Error Tracking
Expand Down
4 changes: 2 additions & 2 deletions apps/desktop/src/renderer/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
- default-src 'self': Only allow resources from same origin
- script-src 'self' 'wasm-unsafe-eval' https://*.posthog.com: Allow scripts from same origin + WebAssembly (for xterm ImageAddon) + PostHog
- style-src 'self' 'unsafe-inline': Allow styles from same origin + inline (needed for CSS-in-JS)
- connect-src 'self' ws: wss: %NEXT_PUBLIC_API_URL% http://localhost:8080 http://localhost:8081 https://*.posthog.com https://*.sentry.io sentry-ipc:: Allow WebSocket + API (includes Electric proxy) + Durable Streams proxy + PostHog + Sentry
- connect-src 'self' ws: wss: %NEXT_PUBLIC_API_URL% %STREAMS_URL% https://*.posthog.com https://*.sentry.io sentry-ipc:: Allow WebSocket + API (includes Electric proxy) + Durable Streams proxy + PostHog + Sentry
- img-src 'self' data: %NEXT_PUBLIC_API_URL% https://*.public.blob.vercel-storage.com https://github.com https://avatars.githubusercontent.com https://models.dev: Allow images from same origin + data URIs + API (Linear image proxy) + Vercel blob storage + GitHub avatars + model provider logos
- font-src 'self': Allow fonts from same origin
-->
<meta http-equiv="Content-Security-Policy" content="default-src 'self'; script-src 'self' 'wasm-unsafe-eval' https://*.posthog.com; style-src 'self' 'unsafe-inline'; connect-src 'self' ws: wss: %NEXT_PUBLIC_API_URL% http://localhost:8080 http://localhost:8081 https://*.posthog.com https://*.sentry.io sentry-ipc:; img-src 'self' data: %NEXT_PUBLIC_API_URL% https://*.public.blob.vercel-storage.com https://github.com https://avatars.githubusercontent.com https://models.dev; font-src 'self';" />
<meta http-equiv="Content-Security-Policy" content="default-src 'self'; script-src 'self' 'wasm-unsafe-eval' https://*.posthog.com; style-src 'self' 'unsafe-inline'; connect-src 'self' ws: wss: %NEXT_PUBLIC_API_URL% %STREAMS_URL% https://*.posthog.com https://*.sentry.io sentry-ipc:; img-src 'self' data: %NEXT_PUBLIC_API_URL% https://*.public.blob.vercel-storage.com https://github.com https://avatars.githubusercontent.com https://models.dev; font-src 'self';" />
</head>

<body>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { StreamError } from "@superset/durable-session";
import { useDurableChat } from "@superset/durable-session/react";
import {
Conversation,
Expand Down Expand Up @@ -343,11 +344,16 @@ export function ChatInterface({

<div className="border-t bg-background px-4 py-3">
<div className="mx-auto w-full max-w-3xl">
{error && (
<div className="rounded-md border border-destructive/20 bg-destructive/10 px-4 py-2 text-sm text-destructive mb-3">
{error.message}
</div>
)}
{error &&
(() => {
const { message, code } = StreamError.friendly(error);
return (
<div className="select-text rounded-md border border-destructive/20 bg-destructive/10 px-4 py-2 text-sm text-destructive mb-3">
{message}
{code && <span className="ml-1 opacity-50">({code})</span>}
</div>
);
})()}
<PromptInputProvider>
<FileMentionProvider cwd={cwd}>
<SlashCommandInput
Expand Down
13 changes: 9 additions & 4 deletions apps/desktop/vite/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,15 @@ export function htmlEnvTransformPlugin(): Plugin {
return {
name: "html-env-transform",
transformIndexHtml(html) {
return html.replace(
/%NEXT_PUBLIC_API_URL%/g,
process.env.NEXT_PUBLIC_API_URL || "https://api.superset.sh",
);
return html
.replace(
/%NEXT_PUBLIC_API_URL%/g,
process.env.NEXT_PUBLIC_API_URL || "https://api.superset.sh",
)
.replace(
/%STREAMS_URL%/g,
process.env.STREAMS_URL || "https://superset-stream.fly.dev",
);
},
};
}
11 changes: 7 additions & 4 deletions apps/streams/src/env.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import { createEnv } from "@t3-oss/env-core";
import { z } from "zod";

const DEFAULT_PORT = 8080;
const DEFAULT_INTERNAL_PORT = 8081;

export const env = createEnv({
server: {
STREAMS_PORT: z.coerce.number(),
STREAMS_INTERNAL_PORT: z.coerce.number(),
STREAMS_INTERNAL_URL: z.string().url(),
STREAMS_DATA_DIR: z.string().min(1),
STREAMS_PORT: z.coerce.number().default(DEFAULT_PORT),
STREAMS_INTERNAL_PORT: z.coerce.number().default(DEFAULT_INTERNAL_PORT),
STREAMS_INTERNAL_URL: z.string().url().optional(),
STREAMS_DATA_DIR: z.string().min(1).default("./data"),
DATABASE_URL: z.string().url(),
},
clientPrefix: "PUBLIC_",
Expand Down
24 changes: 4 additions & 20 deletions apps/streams/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,9 @@
import { execSync } from "node:child_process";
import { existsSync, mkdirSync } from "node:fs";
import { DurableStreamTestServer } from "@durable-streams/server";
import { serve } from "@hono/node-server";
import { env } from "./env";
import { createServer } from "./server";

// Kill stale listeners left behind by dev server restarts
function freePort(port: number): void {
try {
const pid = execSync(`lsof -iTCP:${port} -sTCP:LISTEN -t`, {
encoding: "utf-8",
}).trim();
if (pid) {
process.kill(Number(pid), "SIGKILL");
console.log(`[streams] Killed stale process ${pid} on port ${port}`);
}
} catch {
// No process found on this port — nothing to do
}
}

freePort(env.STREAMS_PORT);
freePort(env.STREAMS_INTERNAL_PORT);

if (!existsSync(env.STREAMS_DATA_DIR)) {
mkdirSync(env.STREAMS_DATA_DIR, { recursive: true });
}
Expand All @@ -36,8 +17,11 @@ console.log(
`[streams] Durable stream server on port ${env.STREAMS_INTERNAL_PORT}`,
);

const internalUrl =
env.STREAMS_INTERNAL_URL ?? `http://localhost:${env.STREAMS_INTERNAL_PORT}`;

const { app } = createServer({
baseUrl: env.STREAMS_INTERNAL_URL,
baseUrl: internalUrl,
cors: true,
logging: true,
});
Expand Down
86 changes: 0 additions & 86 deletions docs/replace-streams-secret-with-session-auth.md

This file was deleted.

44 changes: 14 additions & 30 deletions packages/durable-session/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
createToolResultsCollection,
updateConnectionStatus,
} from "./collections";
import { StreamError } from "./errors";
import { extractTextContent, messageRowToUIMessage } from "./materialize";
import type {
ActorType,
Expand Down Expand Up @@ -131,7 +132,10 @@ export class DurableChatClient<
// ═══════════════════════════════════════════════════════════════════════

constructor(options: DurableChatClientOptions<TTools>) {
this.options = options;
this.options = {
...options,
proxyUrl: options.proxyUrl.replace(/\/+$/, ""),
};
this.sessionId = options.sessionId;
this.actorId = options.actorId ?? crypto.randomUUID();
this.actorType = options.actorType ?? "user";
Expand Down Expand Up @@ -335,8 +339,7 @@ export class DurableChatClient<
});

if (!response.ok) {
const errorText = await response.text();
throw new Error(`Request failed: ${response.status} ${errorText}`);
throw StreamError.fromResponse(response);
}
}

Expand Down Expand Up @@ -381,17 +384,12 @@ export class DurableChatClient<
});
}

stop(): void {
fetch(`${this.options.proxyUrl}/v1/sessions/${this.sessionId}/stop`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ messageId: null }), // null = stop all
}).catch((err) => {
console.warn("Failed to stop generation:", err);
async stop(): Promise<void> {
await this.postToProxy(`/v1/sessions/${this.sessionId}/stop`, {
messageId: null,
});
}
Comment on lines +387 to 391
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

stop() lacks the connection guard and error-state handling present in other mutation methods.

Every other public mutation (sendMessage, addToolResult, addToolApprovalResponse, addToolAnswerResponse) checks this._isConnected before proceeding and routes errors through executeAction (which sets this._error and calls this.options.onError). stop() skips both.

If stop() is called on a disconnected client, the fetch will likely fail with an unhelpful network error. And on failure, this._error won't be set at the client level (though the React hook does catch it separately).

Proposed fix
 async stop(): Promise<void> {
+  if (!this._isConnected) {
+    throw new Error("Client not connected. Call connect() first.");
+  }
-  await this.postToProxy(`/v1/sessions/${this.sessionId}/stop`, {
-    messageId: null,
-  });
+  try {
+    await this.postToProxy(`/v1/sessions/${this.sessionId}/stop`, {
+      messageId: null,
+    });
+  } catch (error) {
+    this._error = error instanceof Error ? error : new Error(String(error));
+    this.options.onError?.(this._error);
+    throw error;
+  }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async stop(): Promise<void> {
await this.postToProxy(`/v1/sessions/${this.sessionId}/stop`, {
messageId: null,
});
}
async stop(): Promise<void> {
if (!this._isConnected) {
throw new Error("Client not connected. Call connect() first.");
}
try {
await this.postToProxy(`/v1/sessions/${this.sessionId}/stop`, {
messageId: null,
});
} catch (error) {
this._error = error instanceof Error ? error : new Error(String(error));
this.options.onError?.(this._error);
throw error;
}
}
🤖 Prompt for AI Agents
In `@packages/durable-session/src/client.ts` around lines 384 - 388, The stop()
method currently calls postToProxy directly and lacks the connection guard and
error-state handling used by other mutation methods; update stop() to first
check this._isConnected and then perform the POST via this.executeAction so any
failures set this._error and invoke this.options.onError (i.e., wrap the call to
this.postToProxy(`/v1/sessions/${this.sessionId}/stop`, { messageId: null })
inside this.executeAction and ensure the method short-circuits when
!this._isConnected).


/** Local-only clear — does not affect the durable stream. */
clear(): void {
this.options.onMessagesChange?.([]);
}
Expand Down Expand Up @@ -589,10 +587,7 @@ export class DurableChatClient<
);

if (!response.ok) {
const errorText = await response.text();
throw new Error(
`Failed to fork session: ${response.status} ${errorText}`,
);
throw StreamError.fromResponse(response);
}

return (await response.json()) as ForkResult;
Expand All @@ -614,10 +609,7 @@ export class DurableChatClient<
);

if (!response.ok) {
const errorText = await response.text();
throw new Error(
`Failed to register agents: ${response.status} ${errorText}`,
);
throw StreamError.fromResponse(response);
}
}

Expand All @@ -635,10 +627,7 @@ export class DurableChatClient<
);

if (!response.ok) {
const errorText = await response.text();
throw new Error(
`Failed to unregister agent: ${response.status} ${errorText}`,
);
throw StreamError.fromResponse(response);
}
}

Expand All @@ -660,7 +649,6 @@ export class DurableChatClient<
updateConnectionStatus(meta, "connecting"),
);

// Skip server call in test mode (injected sessionDB)
if (!this.options.sessionDB) {
const response = await fetch(
`${this.options.proxyUrl}/v1/sessions/${this.sessionId}`,
Expand All @@ -671,12 +659,8 @@ export class DurableChatClient<
},
);

if (
!response.ok &&
response.status !== 200 &&
response.status !== 201
) {
throw new Error(`Failed to create session: ${response.status}`);
if (!response.ok) {
throw StreamError.fromResponse(response);
}
}

Expand Down
53 changes: 53 additions & 0 deletions packages/durable-session/src/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
const FRIENDLY_MESSAGES: Record<number, string> = {
401: "Your session has expired. Please sign in again.",
403: "You don't have permission to access this chat.",
404: "Chat session not found. It may have been deleted.",
429: "Too many requests. Please wait a moment and try again.",
500: "Something went wrong on our end. Please try again.",
502: "Chat server is temporarily unavailable. Please try again.",
503: "Chat server is temporarily unavailable. Please try again.",
};

const NETWORK_MESSAGE =
"Unable to connect to the chat server. Check your internet connection.";

export class StreamError extends Error {
readonly status: number;
readonly friendlyMessage: string;

constructor(status: number, detail?: string) {
const friendly =
FRIENDLY_MESSAGES[status] ?? `Unexpected error (${status})`;
super(detail ?? friendly);
this.name = "StreamError";
this.status = status;
this.friendlyMessage = friendly;
}

static fromResponse(response: Response): StreamError {
return new StreamError(response.status);
}

static friendly(error: unknown): { message: string; code: string | null } {
if (error instanceof StreamError) {
return {
message: error.friendlyMessage,
code: error.status > 0 ? `HTTP ${error.status}` : "NETWORK_ERROR",
};
}
if (error instanceof TypeError && error.message.includes("fetch")) {
return { message: NETWORK_MESSAGE, code: "NETWORK_ERROR" };
}
if (error instanceof Error) {
if (error.message.includes("Content Security Policy")) {
return {
message:
"Connection blocked by security policy. The chat server URL may not be allowed.",
code: "CSP_VIOLATION",
};
}
return { message: error.message, code: null };
}
return { message: "An unexpected error occurred.", code: "UNKNOWN" };
}
}
Loading