Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions apps/streams/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@
"test:conformance": "npx @durable-streams/server-conformance-tests --run http://localhost:8080"
},
"dependencies": {
"@durable-streams/server": "^0.2.0"
"@durable-streams/client": "^0.2.0",
"@durable-streams/server": "^0.2.0",
"@superset/durable-session": "workspace:*",
"@tanstack/db": "^0.5.22",
"@hono/node-server": "^1.13.0",
"hono": "^4.4.0",
"zod": "^4.1.12"
},
"devDependencies": {
"@durable-streams/client": "^0.2.0",
"@durable-streams/server-conformance-tests": "^0.2.0",
"@superset/typescript": "workspace:*",
Comment thread
coderabbitai[bot] marked this conversation as resolved.
"@types/node": "^24.9.1",
Expand Down
7 changes: 7 additions & 0 deletions apps/streams/src/handlers/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export {
handleInvokeAgent,
handleRegisterAgents,
handleUnregisterAgent,
} from "./invoke-agent";
export { handleSendMessage } from "./send-message";
export { createStreamWriter, StreamWriter } from "./stream-writer";
101 changes: 101 additions & 0 deletions apps/streams/src/handlers/invoke-agent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import type { Context } from "hono";
import { z } from "zod";
import type { AIDBSessionProtocol } from "../protocol";
import { type AgentSpec, agentSpecSchema } from "../types";

const invokeAgentRequestSchema = z.object({
agent: agentSpecSchema,
messages: z.array(
z.object({
role: z.string(),
content: z.string(),
}),
),
});

type InvokeAgentRequest = z.infer<typeof invokeAgentRequestSchema>;

export async function handleInvokeAgent(
c: Context,
protocol: AIDBSessionProtocol,
): Promise<Response> {
const sessionId = c.req.param("sessionId");

let body: InvokeAgentRequest;
try {
const rawBody = await c.req.json();
body = invokeAgentRequestSchema.parse(rawBody);
} catch (error) {
return c.json(
{ error: "Invalid request body", details: (error as Error).message },
400,
);
}

try {
const stream = await protocol.getOrCreateSession(sessionId);
await protocol.invokeAgent(stream, sessionId, body.agent, body.messages);
return c.json({ success: true }, 200);
} catch (error) {
console.error("Failed to invoke agent:", error);
return c.json(
{ error: "Failed to invoke agent", details: (error as Error).message },
500,
);
}
}

export async function handleRegisterAgents(
c: Context,
protocol: AIDBSessionProtocol,
): Promise<Response> {
const sessionId = c.req.param("sessionId");

let agents: AgentSpec[];
try {
const rawBody = await c.req.json();
const parsed = z
.object({ agents: z.array(agentSpecSchema) })
.parse(rawBody);
agents = parsed.agents;
} catch (error) {
return c.json(
{ error: "Invalid request body", details: (error as Error).message },
400,
);
}

try {
await protocol.getOrCreateSession(sessionId);
await protocol.registerAgents(sessionId, agents);
return c.json({ success: true }, 200);
} catch (error) {
console.error("Failed to register agents:", error);
return c.json(
{ error: "Failed to register agents", details: (error as Error).message },
500,
);
}
}

export async function handleUnregisterAgent(
c: Context,
protocol: AIDBSessionProtocol,
): Promise<Response> {
const sessionId = c.req.param("sessionId");
const agentId = c.req.param("agentId");

try {
await protocol.unregisterAgent(sessionId, agentId);
return new Response(null, { status: 204 });
} catch (error) {
console.error("Failed to unregister agent:", error);
return c.json(
{
error: "Failed to unregister agent",
details: (error as Error).message,
},
500,
);
}
Comment on lines +81 to +100
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find unregisterAgent implementation in protocol
find . -name "*.ts" -type f | xargs grep -l "unregisterAgent" | head -10

Repository: superset-sh/superset

Length of output: 282


🏁 Script executed:

#!/bin/bash
# Look for the invoke-agent.ts file and examine the surrounding context
fd "invoke-agent.ts" --type f

Repository: superset-sh/superset

Length of output: 105


🏁 Script executed:

#!/bin/bash
# Search for getOrCreateSession usage in the handlers directory
rg "getOrCreateSession" --type ts

Repository: superset-sh/superset

Length of output: 910


🏁 Script executed:

#!/bin/bash
# Read the protocol.ts file to examine unregisterAgent and getOrCreateSession implementations
wc -l apps/streams/src/protocol.ts

Repository: superset-sh/superset

Length of output: 96


🏁 Script executed:

#!/bin/bash
# Read the invoke-agent.ts file to examine all handlers
wc -l apps/streams/src/handlers/invoke-agent.ts

Repository: superset-sh/superset

Length of output: 109


🏁 Script executed:

#!/bin/bash
# Read the entire invoke-agent.ts to see all handlers and patterns
cat -n apps/streams/src/handlers/invoke-agent.ts

Repository: superset-sh/superset

Length of output: 3266


🏁 Script executed:

#!/bin/bash
# Read the unregisterAgent and getOrCreateSession methods from protocol.ts
# First, let's find their line numbers
rg -n "async unregisterAgent|async getOrCreateSession" apps/streams/src/protocol.ts

Repository: superset-sh/superset

Length of output: 173


🏁 Script executed:

#!/bin/bash
# Read the unregisterAgent implementation from protocol.ts
sed -n '563,600p' apps/streams/src/protocol.ts

Repository: superset-sh/superset

Length of output: 1201


🏁 Script executed:

#!/bin/bash
# Also read getOrCreateSession to understand what it does
sed -n '82,120p' apps/streams/src/protocol.ts

Repository: superset-sh/superset

Length of output: 1091


Add session validation before unregistering agent to match the pattern in other handlers and avoid silent failures.

handleUnregisterAgent does not call getOrCreateSession first, unlike handleInvokeAgent (line 36) and handleRegisterAgents (line 69). The unregisterAgent implementation silently succeeds when the session doesn't exist—it returns without error or logging when the session state is not found. This violates the guideline to never swallow errors silently.

Either validate the session exists before unregistering (consistent with other handlers) or explicitly handle and log the missing session case.

🤖 Prompt for AI Agents
In `@apps/streams/src/handlers/invoke-agent.ts` around lines 81 - 100,
handleUnregisterAgent currently calls protocol.unregisterAgent without
validating the session; mirror the pattern used in handleInvokeAgent and
handleRegisterAgents by checking the session first. Call getOrCreateSession (or
the appropriate session lookup helper) with the same sessionId extraction used
in handleUnregisterAgent to ensure the session exists, return an appropriate
error response (e.g., 404 or a JSON error) if the session is missing, and only
then call protocol.unregisterAgent(sessionId, agentId); also log the
missing-session case so it isn’t silently swallowed.

}
61 changes: 61 additions & 0 deletions apps/streams/src/handlers/send-message.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import type { Context } from "hono";
import type { AIDBSessionProtocol } from "../protocol";
import {
type SendMessageRequest,
type SendMessageResponse,
sendMessageRequestSchema,
} from "../types";

export async function handleSendMessage(
c: Context,
protocol: AIDBSessionProtocol,
): Promise<Response> {
const sessionId = c.req.param("sessionId");

let body: SendMessageRequest;
try {
const rawBody = await c.req.json();
body = sendMessageRequestSchema.parse(rawBody);
} catch (error) {
return c.json(
{ error: "Invalid request body", details: (error as Error).message },
400,
);
}

const actorId =
body.actorId ?? c.req.header("X-Actor-Id") ?? crypto.randomUUID();

const messageId = body.messageId ?? crypto.randomUUID();

try {
const stream = await protocol.getOrCreateSession(sessionId);

await protocol.writeUserMessage(
stream,
sessionId,
messageId,
actorId,
body.content,
body.txid,
);

if (body.agent) {
const messageHistory = await protocol.getMessageHistory(sessionId);
protocol
.invokeAgent(stream, sessionId, body.agent, messageHistory)
.catch((err) => {
console.error("[streams/send-message] Agent invocation failed:", err);
});
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

const response: SendMessageResponse = { messageId };
return c.json(response, 200);
} catch (error) {
console.error("Failed to send message:", error);
return c.json(
{ error: "Failed to send message", details: (error as Error).message },
500,
);
}
}
91 changes: 91 additions & 0 deletions apps/streams/src/handlers/stream-writer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import type { DurableStream } from "@durable-streams/client";
import type { AIDBSessionProtocol } from "../protocol";
import type { StreamChunk } from "../types";

type MessageRole = "user" | "assistant" | "system";

export class StreamWriter {
constructor(
private readonly protocol: AIDBSessionProtocol,
private readonly stream: DurableStream,
private readonly sessionId: string,
) {}

async writeUserMessage(
messageId: string,
actorId: string,
content: string,
txid?: string,
): Promise<void> {
await this.protocol.writeUserMessage(
this.stream,
this.sessionId,
messageId,
actorId,
content,
txid,
);
}

async writeChunk(
messageId: string,
actorId: string,
role: MessageRole,
chunk: StreamChunk,
txid?: string,
): Promise<void> {
await this.protocol.writeChunk(
this.stream,
this.sessionId,
messageId,
actorId,
role,
chunk,
txid,
);
}

async writeToolResult(
messageId: string,
actorId: string,
toolCallId: string,
output: unknown,
error: string | null,
txid?: string,
): Promise<void> {
await this.protocol.writeToolResult(
this.stream,
this.sessionId,
messageId,
actorId,
toolCallId,
output,
error,
txid,
);
}

async writeApprovalResponse(
actorId: string,
approvalId: string,
approved: boolean,
txid?: string,
): Promise<void> {
await this.protocol.writeApprovalResponse(
this.stream,
this.sessionId,
actorId,
approvalId,
approved,
txid,
);
}
}

export function createStreamWriter(
protocol: AIDBSessionProtocol,
stream: DurableStream,
sessionId: string,
): StreamWriter {
return new StreamWriter(protocol, stream, sessionId);
}
Loading
Loading