Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
93b7541
Reduce stream latency
Kitenite Feb 11, 2026
2c2434c
Smooth stream
Kitenite Feb 11, 2026
e4ff696
Smooth stream
Kitenite Feb 11, 2026
d0c47b4
fix(streams): fail /generations/finish when producer had background e…
Kitenite Feb 11, 2026
380e509
refactor(streams): extract producer error helpers for clarity
Kitenite Feb 11, 2026
27d421c
fix(desktop): check res.ok for /generations/finish and send messageId
Kitenite Feb 11, 2026
1d1bd45
fix(streams): await producer flush and detach in deleteSession
Kitenite Feb 11, 2026
01ce87c
fix(streams): flush producer before reset control event
Kitenite Feb 11, 2026
87a7994
fix(streams): route all writes through producer for global ordering
Kitenite Feb 11, 2026
d5858bc
fix(desktop): add abort signal to chunk POSTs for fast interrupt
Kitenite Feb 11, 2026
be91b6c
fix(desktop): emit error event when generation finish fails
Kitenite Feb 11, 2026
99493d2
fix(streams): guard session delete/reset with per-session mutex
Kitenite Feb 11, 2026
1a21243
fix(streams): write user messages directly to stream for txid immediacy
Kitenite Feb 11, 2026
7481c20
perf(desktop): remove /generations/start round trip
Kitenite Feb 11, 2026
4378d16
perf: batch chunk sends to reduce per-chunk HTTP overhead
Kitenite Feb 11, 2026
d89b779
Update docs
Kitenite Feb 11, 2026
28347db
perf(streams): tune producer lingerMs and add flush timeout
Kitenite Feb 11, 2026
00c5e61
perf: skip Zod on batch endpoint, add bounded queue to ChunkBatcher
Kitenite Feb 11, 2026
af42487
Add retry with exponential backoff for batch sends (#21)
Kitenite Feb 11, 2026
aeb1aed
Add producer health tracking with sync fallback (#23, #24)
Kitenite Feb 11, 2026
fc6197e
Track active generation per session for single-writer enforcement (#2…
Kitenite Feb 11, 2026
ab271f9
Add sessionId and messageId to all route responses (#34)
Kitenite Feb 11, 2026
056af58
Add structured error codes to all route responses (#32)
Kitenite Feb 11, 2026
2050c5a
Add structured error codes and sessionId to auth routes (#32, #34)
Kitenite Feb 11, 2026
cbe05da
Format: lint fixes across streams and desktop
Kitenite Feb 11, 2026
1679647
Update recommendations doc: mark completed items
Kitenite Feb 11, 2026
60b9f0c
Add perf
Kitenite Feb 11, 2026
a891e1a
Chunks
Kitenite Feb 11, 2026
963475c
Remove /generations/start endpoint (#29)
Kitenite Feb 11, 2026
1075c38
Document terminal semantics convention (#31)
Kitenite Feb 11, 2026
329d6c1
Fix restore
Kitenite Feb 11, 2026
e9d0ac5
Refactor
Kitenite Feb 11, 2026
417dadc
More splitting
Kitenite Feb 11, 2026
88172aa
More split
Kitenite Feb 11, 2026
edf0bea
Fixed feedback
Kitenite Feb 11, 2026
e526c47
Chunk
Kitenite Feb 11, 2026
838c515
Fixed comments
Kitenite Feb 11, 2026
b3f660c
Update CI env
Kitenite Feb 11, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/deploy-preview.yml
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ jobs:
STRIPE_PRO_MONTHLY_PRICE_ID: ${{ secrets.STRIPE_PRO_MONTHLY_PRICE_ID }}
STRIPE_PRO_YEARLY_PRICE_ID: ${{ secrets.STRIPE_PRO_YEARLY_PRICE_ID }}
SLACK_BILLING_WEBHOOK_URL: ${{ secrets.SLACK_BILLING_WEBHOOK_URL }}
STREAMS_URL: ${{ env.STREAMS_URL }}
run: |
vercel pull --yes --environment=preview --token=$VERCEL_TOKEN
vercel build --token=$VERCEL_TOKEN
Expand Down Expand Up @@ -295,7 +296,8 @@ jobs:
--env STRIPE_WEBHOOK_SECRET=$STRIPE_WEBHOOK_SECRET \
--env STRIPE_PRO_MONTHLY_PRICE_ID=$STRIPE_PRO_MONTHLY_PRICE_ID \
--env STRIPE_PRO_YEARLY_PRICE_ID=$STRIPE_PRO_YEARLY_PRICE_ID \
--env SLACK_BILLING_WEBHOOK_URL=$SLACK_BILLING_WEBHOOK_URL)
--env SLACK_BILLING_WEBHOOK_URL=$SLACK_BILLING_WEBHOOK_URL \
--env STREAMS_URL=$STREAMS_URL)
vercel alias $VERCEL_URL ${{ env.API_ALIAS }} --scope=$VERCEL_ORG_ID --token=$VERCEL_TOKEN
echo "vercel_url=$VERCEL_URL" >> $GITHUB_OUTPUT

Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/deploy-production.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ jobs:
STRIPE_PRO_MONTHLY_PRICE_ID: ${{ secrets.STRIPE_PRO_MONTHLY_PRICE_ID }}
STRIPE_PRO_YEARLY_PRICE_ID: ${{ secrets.STRIPE_PRO_YEARLY_PRICE_ID }}
SLACK_BILLING_WEBHOOK_URL: ${{ secrets.SLACK_BILLING_WEBHOOK_URL }}
STREAMS_URL: ${{ secrets.STREAMS_URL }}
run: |
vercel pull --yes --environment=production --token=$VERCEL_TOKEN
vercel build --prod --token=$VERCEL_TOKEN
Expand Down Expand Up @@ -154,7 +155,8 @@ jobs:
--env STRIPE_WEBHOOK_SECRET=$STRIPE_WEBHOOK_SECRET \
--env STRIPE_PRO_MONTHLY_PRICE_ID=$STRIPE_PRO_MONTHLY_PRICE_ID \
--env STRIPE_PRO_YEARLY_PRICE_ID=$STRIPE_PRO_YEARLY_PRICE_ID \
--env SLACK_BILLING_WEBHOOK_URL=$SLACK_BILLING_WEBHOOK_URL
--env SLACK_BILLING_WEBHOOK_URL=$SLACK_BILLING_WEBHOOK_URL \
--env STREAMS_URL=$STREAMS_URL

deploy-web:
name: Deploy Web to Vercel
Expand Down
35 changes: 23 additions & 12 deletions apps/desktop/src/lib/trpc/routers/ai-chat/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ interface CommandEntry {
argumentHint: string;
}

const permissionModeSchema = z.enum([
"default",
"acceptEdits",
"bypassPermissions",
]);

function scanCustomCommands(cwd: string): CommandEntry[] {
const dirs = [
join(cwd, ".claude", "commands"),
Expand Down Expand Up @@ -79,7 +85,7 @@ export const createAiChatRouter = () => {
paneId: z.string().optional(),
tabId: z.string().optional(),
model: z.string().optional(),
permissionMode: z.string().optional(),
permissionMode: permissionModeSchema.optional(),
}),
)
.mutation(async ({ input }) => {
Expand All @@ -103,7 +109,7 @@ export const createAiChatRouter = () => {
paneId: z.string().optional(),
tabId: z.string().optional(),
model: z.string().optional(),
permissionMode: z.string().optional(),
permissionMode: permissionModeSchema.optional(),
}),
)
.mutation(async ({ input }) => {
Expand Down Expand Up @@ -151,10 +157,7 @@ export const createAiChatRouter = () => {
sessionId: z.string(),
maxThinkingTokens: z.number().nullable().optional(),
model: z.string().nullable().optional(),
permissionMode: z
.enum(["default", "acceptEdits", "bypassPermissions"])
.nullable()
.optional(),
permissionMode: permissionModeSchema.nullable().optional(),
}),
)
.mutation(async ({ input }) => {
Expand All @@ -175,8 +178,9 @@ export const createAiChatRouter = () => {
}),
)
.mutation(async ({ input }) => {
await chatSessionManager.updateSessionMeta(input.sessionId, {
title: input.title,
await chatSessionManager.updateSessionMeta({
sessionId: input.sessionId,
patch: { title: input.title },
});
return { success: true };
}),
Expand Down Expand Up @@ -229,10 +233,17 @@ export const createAiChatRouter = () => {
.input(z.object({ sessionId: z.string(), text: z.string() }))
.mutation(({ input }) => {
// Fire-and-forget: agent runs in background, errors surface via streamEvents
chatSessionManager.startAgent({
sessionId: input.sessionId,
prompt: input.text,
});
void chatSessionManager
.startAgent({
sessionId: input.sessionId,
prompt: input.text,
})
.catch((error: unknown) => {
console.error(
"[ai-chat/sendMessage] Failed to start agent:",
error,
);
});
return { success: true };
}),

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import { join } from "node:path";
import {
createPermissionRequest,
executeAgent,
resolvePendingPermission,
} from "@superset/agent";
import { app } from "electron";
import { buildClaudeEnv } from "../auth";
import type { SessionStore } from "../session-store";
import type { PermissionRequestEvent } from "./session-events";
import type { ActiveSession } from "./session-types";

function getClaudeBinaryPath(): string {
if (app.isPackaged) {
return join(process.resourcesPath, "bin", "claude");
}
const platform = process.platform;
const arch = process.arch;
return join(
app.getAppPath(),
"resources",
"bin",
`${platform}-${arch}`,
"claude",
);
}

export interface ResolvePermissionInput {
sessionId: string;
toolUseId: string;
approved: boolean;
updatedInput?: Record<string, unknown>;
}

export interface ExecuteAgentInput {
session: ActiveSession;
sessionId: string;
prompt: string;
abortController: AbortController;
onChunk: (chunk: unknown) => void;
}

interface AgentExecutionDeps {
store: SessionStore;
emitPermissionRequest: (event: PermissionRequestEvent) => void;
}

export class AgentExecution {
constructor(private readonly deps: AgentExecutionDeps) {}

async execute({
session,
sessionId,
prompt,
abortController,
onChunk,
}: ExecuteAgentInput): Promise<void> {
const agentEnv = buildClaudeEnv();

await executeAgent({
sessionId,
prompt,
cwd: session.cwd,
pathToClaudeCodeExecutable: getClaudeBinaryPath(),
env: agentEnv,
model: session.model,
permissionMode: session.permissionMode ?? "default",
maxThinkingTokens: session.maxThinkingTokens,
signal: abortController.signal,
onChunk,
onPermissionRequest: async (params) => {
this.deps.emitPermissionRequest({
type: "permission_request",
sessionId,
toolUseId: params.toolUseId,
toolName: params.toolName,
input: params.input,
});

return createPermissionRequest({
toolUseId: params.toolUseId,
signal: params.signal,
});
},
onEvent: (event) => {
if (event.type === "session_initialized") {
this.deps.store
.update(sessionId, {
providerSessionId: event.claudeSessionId,
lastActiveAt: Date.now(),
})
.catch((err: unknown) => {
console.error(
`[chat/session] Failed to update providerSessionId:`,
err,
);
});
}
},
});
}

resolvePermission({
sessionId,
toolUseId,
approved,
updatedInput,
}: ResolvePermissionInput): void {
const result = approved
? {
behavior: "allow" as const,
updatedInput: updatedInput ?? {},
}
: { behavior: "deny" as const, message: "User denied permission" };

const resolved = resolvePendingPermission({ toolUseId, result });
if (!resolved) {
console.warn(
`[chat/session] No pending permission for toolUseId=${toolUseId} in session ${sessionId}`,
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import type { SessionStore } from "../session-store";
import type { ResolvePermissionInput } from "./agent-execution";
import { AgentExecution } from "./agent-execution";
import { AgentStreamWriter } from "./agent-stream-writer";
import type { ChunkBatcher } from "./chunk-batcher";
import type { GenerationWatchdog } from "./generation-watchdog";
import type { PermissionRequestEvent } from "./session-events";
import type { ActiveSession, EnsureSessionReadyInput } from "./session-types";

export type { ResolvePermissionInput } from "./agent-execution";

export interface StartAgentInput {
sessionId: string;
prompt: string;
}

interface AgentRunnerDeps {
store: SessionStore;
sessions: Map<string, ActiveSession>;
runningAgents: Map<string, AbortController>;
proxyUrl: string;
emitSessionError: (params: { sessionId: string; error: string }) => void;
emitPermissionRequest: (event: PermissionRequestEvent) => void;
ensureSessionReady: (input: EnsureSessionReadyInput) => Promise<void>;
}

export class AgentRunner {
private readonly execution: AgentExecution;
private readonly streamWriter: AgentStreamWriter;

constructor(private readonly deps: AgentRunnerDeps) {
this.execution = new AgentExecution({
store: deps.store,
emitPermissionRequest: deps.emitPermissionRequest,
});
this.streamWriter = new AgentStreamWriter({
proxyUrl: deps.proxyUrl,
emitSessionError: deps.emitSessionError,
ensureSessionReady: deps.ensureSessionReady,
isSessionActive: (sessionId) => this.deps.sessions.has(sessionId),
});
}

private abortExistingAgent({ sessionId }: { sessionId: string }): void {
const existingController = this.deps.runningAgents.get(sessionId);
if (!existingController) return;
console.warn(`[chat/session] Aborting previous agent run for ${sessionId}`);
existingController.abort();
if (this.deps.runningAgents.get(sessionId) === existingController) {
this.deps.runningAgents.delete(sessionId);
}
}

async startAgent({ sessionId, prompt }: StartAgentInput): Promise<void> {
const session = this.deps.sessions.get(sessionId);
if (!session) {
console.error(
`[chat/session] Session ${sessionId} not found for startAgent`,
);
this.deps.emitSessionError({
sessionId,
error: "Session not active",
});
return;
}

this.abortExistingAgent({ sessionId });

const abortController = new AbortController();
this.deps.runningAgents.set(sessionId, abortController);
Comment thread
coderabbitai[bot] marked this conversation as resolved.

const messageId = crypto.randomUUID();
let headers: Record<string, string> | null = null;
let batcher: ChunkBatcher | null = null;
let watchdog: GenerationWatchdog | null = null;

try {
const prepared = await this.streamWriter.prepareStream({
sessionId,
session,
abortController,
});
headers = prepared.headers;
batcher = prepared.batcher;
watchdog = prepared.watchdog;

await this.execution.execute({
session,
sessionId,
prompt,
abortController,
onChunk: (chunk) => {
this.streamWriter.onAssistantChunk({
watchdog: watchdog as GenerationWatchdog,
batcher: batcher as ChunkBatcher,
messageId,
chunk,
});
},
});
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
if (!abortController.signal.aborted) {
console.error(
`[chat/session] Agent execution failed for ${sessionId}:`,
message,
);
this.deps.emitSessionError({ sessionId, error: message });
} else if (watchdog?.wasTriggered) {
console.warn(
`[chat/session] Agent aborted by watchdog for ${sessionId}:`,
message,
);
}
} finally {
watchdog?.clear();
await this.streamWriter.drainChunkBatcher({
sessionId,
batcher,
abortController,
});
await this.streamWriter.finalizeGeneration({
sessionId,
session,
messageId,
headers,
});
if (this.deps.runningAgents.get(sessionId) === abortController) {
this.deps.runningAgents.delete(sessionId);
}
}
}

resolvePermission(input: ResolvePermissionInput): void {
this.execution.resolvePermission(input);
}
}
Loading
Loading