From 5463e310669cbf0826f0e32acf6fcab394824835 Mon Sep 17 00:00:00 2001 From: noamzbr Date: Tue, 16 Dec 2025 15:53:44 +0200 Subject: [PATCH 1/6] fix(acp): use single global event subscription and route by sessionID Avoid per-session SSE subscriptions that caused cross-session event pollution and duplicate events on repeated loadSession(). ACP now maintains one global event stream and forwards updates to the correct ACP session using event payload sessionID. Adds ACP regression tests for cross-session isolation and subscription duplication. --- packages/opencode/src/acp/agent.ts | 537 +++++++++--------- packages/opencode/src/acp/session.ts | 4 + .../test/acp/event-subscription.test.ts | 290 ++++++++++ 3 files changed, 578 insertions(+), 253 deletions(-) create mode 100644 packages/opencode/test/acp/event-subscription.test.ts diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index d20c971ebc5..740e2679e1c 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -20,7 +20,7 @@ import { } from "@agentclientprotocol/sdk" import { Log } from "../util/log" import { ACPSessionManager } from "./session" -import type { ACPConfig, ACPSessionState } from "./types" +import type { ACPConfig } from "./types" import { Provider } from "../provider/provider" import { Installation } from "@/installation" import { MessageV2 } from "@/session/message-v2" @@ -28,7 +28,7 @@ import { Config } from "@/config/config" import { Todo } from "@/session/todo" import { z } from "zod" import { LoadAPIKeyError } from "ai" -import type { OpencodeClient, SessionMessageResponse } from "@opencode-ai/sdk/v2" +import type { Event, OpencodeClient, SessionMessageResponse } from "@opencode-ai/sdk/v2" export namespace ACP { const log = Log.create({ service: "acp-agent" }) @@ -49,285 +49,320 @@ export namespace ACP { private connection: AgentSideConnection private config: ACPConfig private sdk: OpencodeClient - private sessionManager + private sessionManager: ACPSessionManager + private eventAbort = new AbortController() + private eventStarted = false + private permissionOptions: PermissionOption[] = [ + { optionId: "once", kind: "allow_once", name: "Allow once" }, + { optionId: "always", kind: "allow_always", name: "Always allow" }, + { optionId: "reject", kind: "reject_once", name: "Reject" }, + ] constructor(connection: AgentSideConnection, config: ACPConfig) { this.connection = connection this.config = config this.sdk = config.sdk this.sessionManager = new ACPSessionManager(this.sdk) + this.startEventSubscription() } - private setupEventSubscriptions(session: ACPSessionState) { - const sessionId = session.id - const directory = session.cwd + private startEventSubscription() { + if (this.eventStarted) return + this.eventStarted = true + this.runEventSubscription().catch((error) => { + if (this.eventAbort.signal.aborted) return + log.error("event subscription failed", { error }) + }) + } - const options: PermissionOption[] = [ - { optionId: "once", kind: "allow_once", name: "Allow once" }, - { optionId: "always", kind: "allow_always", name: "Always allow" }, - { optionId: "reject", kind: "reject_once", name: "Reject" }, - ] - this.config.sdk.event.subscribe({ directory }).then(async (events) => { + private async runEventSubscription() { + while (true) { + if (this.eventAbort.signal.aborted) return + const events = await this.sdk.global.event({ + signal: this.eventAbort.signal, + }) for await (const event of events.stream) { - switch (event.type) { - case "permission.updated": - try { - const permission = event.properties - const res = await this.connection - .requestPermission({ + if (this.eventAbort.signal.aborted) return + const payload = (event as any)?.payload + if (!payload) continue + await this.handleEvent(payload as Event).catch((error) => { + log.error("failed to handle event", { error, type: payload.type }) + }) + } + } + } + + private async handleEvent(event: Event) { + switch (event.type) { + case "permission.updated": { + const permission = event.properties + const session = this.sessionManager.tryGet(permission.sessionID) + if (!session) return + const directory = session.cwd + + const res = await this.connection + .requestPermission({ + sessionId: permission.sessionID, + toolCall: { + toolCallId: permission.callID ?? permission.id, + status: "pending", + title: permission.title, + rawInput: permission.metadata, + kind: toToolKind(permission.type), + locations: toLocations(permission.type, permission.metadata), + }, + options: this.permissionOptions, + }) + .catch(async (error) => { + log.error("failed to request permission from ACP", { + error, + permissionID: permission.id, + sessionID: permission.sessionID, + }) + await this.sdk.permission.respond({ + sessionID: permission.sessionID, + permissionID: permission.id, + response: "reject", + directory, + }) + return undefined + }) + + if (!res) return + if (res.outcome.outcome !== "selected") { + await this.sdk.permission.respond({ + sessionID: permission.sessionID, + permissionID: permission.id, + response: "reject", + directory, + }) + return + } + + await this.sdk.permission.respond({ + sessionID: permission.sessionID, + permissionID: permission.id, + response: res.outcome.optionId as "once" | "always" | "reject", + directory, + }) + return + } + + case "message.part.updated": { + log.info("message part updated", { event: event.properties }) + const props = event.properties + const part = props.part + const session = this.sessionManager.tryGet(part.sessionID) + if (!session) return + const sessionId = session.id + const directory = session.cwd + + const message = await this.sdk.session + .message( + { + sessionID: part.sessionID, + messageID: part.messageID, + directory, + }, + { throwOnError: true }, + ) + .then((x) => x.data) + .catch((error) => { + log.error("unexpected error when fetching message", { error }) + return undefined + }) + + if (!message || message.info.role !== "assistant") return + + if (part.type === "tool") { + switch (part.state.status) { + case "pending": + await this.connection + .sessionUpdate({ sessionId, - toolCall: { - toolCallId: permission.callID ?? permission.id, + update: { + sessionUpdate: "tool_call", + toolCallId: part.callID, + title: part.tool, + kind: toToolKind(part.tool), status: "pending", - title: permission.title, - rawInput: permission.metadata, - kind: toToolKind(permission.type), - locations: toLocations(permission.type, permission.metadata), + locations: [], + rawInput: {}, }, - options, - }) - .catch(async (error) => { - log.error("failed to request permission from ACP", { - error, - permissionID: permission.id, - sessionID: permission.sessionID, - }) - await this.config.sdk.permission.respond({ - sessionID: permission.sessionID, - permissionID: permission.id, - response: "reject", - directory, - }) - return }) - if (!res) return - if (res.outcome.outcome !== "selected") { - await this.config.sdk.permission.respond({ - sessionID: permission.sessionID, - permissionID: permission.id, - response: "reject", - directory, + .catch((error) => { + log.error("failed to send tool pending to ACP", { error }) }) - return - } - await this.config.sdk.permission.respond({ - sessionID: permission.sessionID, - permissionID: permission.id, - response: res.outcome.optionId as "once" | "always" | "reject", - directory, - }) - } catch (err) { - log.error("unexpected error when handling permission", { error: err }) - } finally { - break - } + return - case "message.part.updated": - log.info("message part updated", { event: event.properties }) - try { - const props = event.properties - const { part } = props - - const message = await this.config.sdk.session - .message( - { - sessionID: part.sessionID, - messageID: part.messageID, - directory, + case "running": + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "in_progress", + locations: toLocations(part.tool, part.state.input), + rawInput: part.state.input, + }, + }) + .catch((error) => { + log.error("failed to send tool in_progress to ACP", { error }) + }) + return + + case "completed": { + const kind = toToolKind(part.tool) + const content: ToolCallContent[] = [ + { + type: "content", + content: { + type: "text", + text: part.state.output, }, - { throwOnError: true }, - ) - .then((x) => x.data) - .catch((err) => { - log.error("unexpected error when fetching message", { error: err }) - return undefined + }, + ] + + if (kind === "edit") { + const input = part.state.input + const filePath = typeof input["filePath"] === "string" ? input["filePath"] : "" + const oldText = typeof input["oldString"] === "string" ? input["oldString"] : "" + const newText = + typeof input["newString"] === "string" + ? input["newString"] + : typeof input["content"] === "string" + ? input["content"] + : "" + content.push({ + type: "diff", + path: filePath, + oldText, + newText, }) + } - if (!message || message.info.role !== "assistant") return - - if (part.type === "tool") { - switch (part.state.status) { - case "pending": - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "tool_call", - toolCallId: part.callID, - title: part.tool, - kind: toToolKind(part.tool), - status: "pending", - locations: [], - rawInput: {}, - }, - }) - .catch((err) => { - log.error("failed to send tool pending to ACP", { error: err }) - }) - break - case "running": - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "tool_call_update", - toolCallId: part.callID, - status: "in_progress", - locations: toLocations(part.tool, part.state.input), - rawInput: part.state.input, - }, - }) - .catch((err) => { - log.error("failed to send tool in_progress to ACP", { error: err }) - }) - break - case "completed": - const kind = toToolKind(part.tool) - const content: ToolCallContent[] = [ - { - type: "content", - content: { - type: "text", - text: part.state.output, - }, - }, - ] - - if (kind === "edit") { - const input = part.state.input - const filePath = typeof input["filePath"] === "string" ? input["filePath"] : "" - const oldText = typeof input["oldString"] === "string" ? input["oldString"] : "" - const newText = - typeof input["newString"] === "string" - ? input["newString"] - : typeof input["content"] === "string" - ? input["content"] - : "" - content.push({ - type: "diff", - path: filePath, - oldText, - newText, - }) - } - - if (part.tool === "todowrite") { - const parsedTodos = z.array(Todo.Info).safeParse(JSON.parse(part.state.output)) - if (parsedTodos.success) { - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "plan", - entries: parsedTodos.data.map((todo) => { - const status: PlanEntry["status"] = - todo.status === "cancelled" ? "completed" : (todo.status as PlanEntry["status"]) - return { - priority: "medium", - status, - content: todo.content, - } - }), - }, - }) - .catch((err) => { - log.error("failed to send session update for todo", { error: err }) - }) - } else { - log.error("failed to parse todo output", { error: parsedTodos.error }) - } - } - - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "tool_call_update", - toolCallId: part.callID, - status: "completed", - kind, - content, - title: part.state.title, - rawOutput: { - output: part.state.output, - metadata: part.state.metadata, - }, - }, - }) - .catch((err) => { - log.error("failed to send tool completed to ACP", { error: err }) - }) - break - case "error": - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "tool_call_update", - toolCallId: part.callID, - status: "failed", - content: [ - { - type: "content", - content: { - type: "text", - text: part.state.error, - }, - }, - ], - rawOutput: { - error: part.state.error, - }, - }, - }) - .catch((err) => { - log.error("failed to send tool error to ACP", { error: err }) - }) - break - } - } else if (part.type === "text") { - const delta = props.delta - if (delta && part.synthetic !== true) { + if (part.tool === "todowrite") { + const parsedTodos = z.array(Todo.Info).safeParse(JSON.parse(part.state.output)) + if (parsedTodos.success) { await this.connection .sessionUpdate({ sessionId, update: { - sessionUpdate: "agent_message_chunk", - content: { - type: "text", - text: delta, - }, + sessionUpdate: "plan", + entries: parsedTodos.data.map((todo) => { + const status: PlanEntry["status"] = + todo.status === "cancelled" ? "completed" : (todo.status as PlanEntry["status"]) + return { + priority: "medium", + status, + content: todo.content, + } + }), }, }) - .catch((err) => { - log.error("failed to send text to ACP", { error: err }) + .catch((error) => { + log.error("failed to send session update for todo", { error }) }) + } else { + log.error("failed to parse todo output", { error: parsedTodos.error }) } - } else if (part.type === "reasoning") { - const delta = props.delta - if (delta) { - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "agent_thought_chunk", + } + + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "completed", + kind, + content, + title: part.state.title, + rawOutput: { + output: part.state.output, + metadata: part.state.metadata, + }, + }, + }) + .catch((error) => { + log.error("failed to send tool completed to ACP", { error }) + }) + return + } + + case "error": + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "failed", + content: [ + { + type: "content", content: { type: "text", - text: delta, + text: part.state.error, }, }, - }) - .catch((err) => { - log.error("failed to send reasoning to ACP", { error: err }) - }) - } - } - } finally { - break - } + ], + rawOutput: { + error: part.state.error, + }, + }, + }) + .catch((error) => { + log.error("failed to send tool error to ACP", { error }) + }) + return + } + } + + if (part.type === "text") { + const delta = props.delta + if (delta && part.synthetic !== true) { + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "agent_message_chunk", + content: { + type: "text", + text: delta, + }, + }, + }) + .catch((error) => { + log.error("failed to send text to ACP", { error }) + }) + } + return } + + if (part.type === "reasoning") { + const delta = props.delta + if (delta) { + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "agent_thought_chunk", + content: { + type: "text", + text: delta, + }, + }, + }) + .catch((error) => { + log.error("failed to send reasoning to ACP", { error }) + }) + } + } + return } - }) + } } async initialize(params: InitializeRequest): Promise { @@ -392,8 +427,6 @@ export namespace ACP { sessionId, }) - this.setupEventSubscriptions(state) - return { sessionId, models: load.models, @@ -419,7 +452,7 @@ export namespace ACP { const model = await defaultModel(this.config, directory) // Store ACP session state - const state = await this.sessionManager.load(sessionId, params.cwd, params.mcpServers, model) + await this.sessionManager.load(sessionId, params.cwd, params.mcpServers, model) log.info("load_session", { sessionId, mcpServers: params.mcpServers.length }) @@ -429,8 +462,6 @@ export namespace ACP { sessionId, }) - this.setupEventSubscriptions(state) - // Replay session history const messages = await this.sdk.session .messages( diff --git a/packages/opencode/src/acp/session.ts b/packages/opencode/src/acp/session.ts index 70b65834705..151fa5646ba 100644 --- a/packages/opencode/src/acp/session.ts +++ b/packages/opencode/src/acp/session.ts @@ -13,6 +13,10 @@ export class ACPSessionManager { this.sdk = sdk } + tryGet(sessionId: string): ACPSessionState | undefined { + return this.sessions.get(sessionId) + } + async create(cwd: string, mcpServers: McpServer[], model?: ACPSessionState["model"]): Promise { const session = await this.sdk.session .create( diff --git a/packages/opencode/test/acp/event-subscription.test.ts b/packages/opencode/test/acp/event-subscription.test.ts new file mode 100644 index 00000000000..8b68547d13b --- /dev/null +++ b/packages/opencode/test/acp/event-subscription.test.ts @@ -0,0 +1,290 @@ +import { describe, expect, test } from "bun:test" +import { ACP } from "../../src/acp/agent" +import type { AgentSideConnection } from "@agentclientprotocol/sdk" +import type { Event } from "@opencode-ai/sdk/v2" + +type SessionUpdateParams = Parameters[0] +type RequestPermissionParams = Parameters[0] +type RequestPermissionResult = Awaited> + +type GlobalEventEnvelope = { + directory?: string + payload?: Event +} + +type EventController = { + push: (event: GlobalEventEnvelope) => void + close: () => void +} + +function createEventStream() { + const queue: GlobalEventEnvelope[] = [] + const waiters: Array<(value: GlobalEventEnvelope | undefined) => void> = [] + const state = { closed: false } + + const push = (event: GlobalEventEnvelope) => { + const waiter = waiters.shift() + if (waiter) { + waiter(event) + return + } + queue.push(event) + } + + const close = () => { + state.closed = true + for (const waiter of waiters.splice(0)) { + waiter(undefined) + } + } + + const stream = async function* (signal?: AbortSignal) { + while (true) { + if (signal?.aborted) return + const next = queue.shift() + if (next) { + yield next + continue + } + if (state.closed) return + const value = await new Promise((resolve) => { + waiters.push(resolve) + if (!signal) return + signal.addEventListener("abort", () => resolve(undefined), { once: true }) + }) + if (!value) return + yield value + } + } + + return { controller: { push, close } satisfies EventController, stream } +} + +function createFakeAgent() { + const updates = new Map() + const chunks = new Map() + const record = (sessionId: string, type: string) => { + const list = updates.get(sessionId) ?? [] + list.push(type) + updates.set(sessionId, list) + } + + const connection = { + async sessionUpdate(params: SessionUpdateParams) { + const update = params.update + const type = update?.sessionUpdate ?? "unknown" + record(params.sessionId, type) + if (update?.sessionUpdate === "agent_message_chunk") { + const content = update.content + if (content?.type !== "text") return + if (typeof content.text !== "string") return + chunks.set(params.sessionId, (chunks.get(params.sessionId) ?? "") + content.text) + } + }, + async requestPermission(_params: RequestPermissionParams): Promise { + return { outcome: { outcome: "selected", optionId: "once" } } as RequestPermissionResult + }, + } as unknown as AgentSideConnection + + const { controller, stream } = createEventStream() + const calls = { + eventSubscribe: 0, + sessionCreate: 0, + } + + const sdk = { + global: { + event: async (opts?: { signal?: AbortSignal }) => { + calls.eventSubscribe++ + return { stream: stream(opts?.signal) } + }, + }, + session: { + create: async (_params?: any) => { + calls.sessionCreate++ + return { + data: { + id: `ses_${calls.sessionCreate}`, + time: { created: new Date().toISOString() }, + }, + } + }, + get: async (_params?: any) => { + return { + data: { + id: "ses_1", + time: { created: new Date().toISOString() }, + }, + } + }, + messages: async () => { + return { data: [] } + }, + message: async () => { + return { + data: { + info: { + role: "assistant", + }, + }, + } + }, + }, + permission: { + respond: async () => { + return { data: true } + }, + }, + config: { + providers: async () => { + return { + data: { + providers: [ + { + id: "opencode", + name: "opencode", + models: { + "big-pickle": { id: "big-pickle", name: "big-pickle" }, + }, + }, + ], + }, + } + }, + }, + app: { + agents: async () => { + return { + data: [ + { + name: "build", + description: "build", + mode: "agent", + }, + ], + } + }, + }, + command: { + list: async () => { + return { data: [] } + }, + }, + mcp: { + add: async () => { + return { data: true } + }, + }, + } as any + + const agent = new ACP.Agent(connection, { + sdk, + defaultModel: { providerID: "opencode", modelID: "big-pickle" }, + } as any) + + const stop = () => { + controller.close() + ;(agent as any).eventAbort.abort() + } + + return { agent, controller, calls, updates, chunks, stop } +} + +describe("acp.agent event subscription", () => { + test("routes message.part.updated by the event sessionID (no cross-session pollution)", async () => { + const { agent, controller, updates, stop } = createFakeAgent() + const cwd = "/tmp/opencode-acp-test" + + const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + + controller.push({ + directory: cwd, + payload: { + type: "message.part.updated", + properties: { + part: { + sessionID: sessionB, + messageID: "msg_1", + type: "text", + synthetic: false, + }, + delta: "hello", + }, + }, + } as any) + + await new Promise((r) => setTimeout(r, 10)) + + expect((updates.get(sessionA) ?? []).includes("agent_message_chunk")).toBe(false) + expect((updates.get(sessionB) ?? []).includes("agent_message_chunk")).toBe(true) + + stop() + }) + + test("keeps concurrent sessions isolated when message.part.updated events are interleaved", async () => { + const { agent, controller, chunks, stop } = createFakeAgent() + const cwd = "/tmp/opencode-acp-test" + + const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + + const tokenA = ["ALPHA_", "111", "_X"] + const tokenB = ["BETA_", "222", "_Y"] + + const push = (sessionId: string, messageID: string, delta: string) => { + controller.push({ + directory: cwd, + payload: { + type: "message.part.updated", + properties: { + part: { + sessionID: sessionId, + messageID, + type: "text", + synthetic: false, + }, + delta, + }, + }, + } as any) + } + + push(sessionA, "msg_a", tokenA[0]) + push(sessionB, "msg_b", tokenB[0]) + push(sessionA, "msg_a", tokenA[1]) + push(sessionB, "msg_b", tokenB[1]) + push(sessionA, "msg_a", tokenA[2]) + push(sessionB, "msg_b", tokenB[2]) + + await new Promise((r) => setTimeout(r, 20)) + + const a = chunks.get(sessionA) ?? "" + const b = chunks.get(sessionB) ?? "" + + expect(a).toContain(tokenA.join("")) + expect(b).toContain(tokenB.join("")) + for (const part of tokenB) expect(a).not.toContain(part) + for (const part of tokenA) expect(b).not.toContain(part) + + stop() + }) + + test("does not create additional event subscriptions on repeated loadSession()", async () => { + const { agent, calls, stop } = createFakeAgent() + const cwd = "/tmp/opencode-acp-test" + + const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + + await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) + await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) + await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) + await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) + + expect(calls.eventSubscribe).toBe(1) + + stop() + }) +}) + + From c00a159466af4e7dc2b98576327ad9978f777892 Mon Sep 17 00:00:00 2001 From: noamzbr Date: Tue, 16 Dec 2025 16:09:10 +0200 Subject: [PATCH 2/6] format fix --- packages/opencode/test/acp/event-subscription.test.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/opencode/test/acp/event-subscription.test.ts b/packages/opencode/test/acp/event-subscription.test.ts index 8b68547d13b..71cba58b509 100644 --- a/packages/opencode/test/acp/event-subscription.test.ts +++ b/packages/opencode/test/acp/event-subscription.test.ts @@ -286,5 +286,3 @@ describe("acp.agent event subscription", () => { stop() }) }) - - From 33c00403a063d9e3e28033b87f348b895de5c531 Mon Sep 17 00:00:00 2001 From: noamzbr Date: Mon, 22 Dec 2025 17:03:41 +0200 Subject: [PATCH 3/6] update tests --- .../test/acp/event-subscription.test.ts | 164 ++++++++++-------- 1 file changed, 92 insertions(+), 72 deletions(-) diff --git a/packages/opencode/test/acp/event-subscription.test.ts b/packages/opencode/test/acp/event-subscription.test.ts index 71cba58b509..18bc1013586 100644 --- a/packages/opencode/test/acp/event-subscription.test.ts +++ b/packages/opencode/test/acp/event-subscription.test.ts @@ -2,6 +2,8 @@ import { describe, expect, test } from "bun:test" import { ACP } from "../../src/acp/agent" import type { AgentSideConnection } from "@agentclientprotocol/sdk" import type { Event } from "@opencode-ai/sdk/v2" +import { Instance } from "../../src/project/instance" +import { tmpdir } from "../fixture/fixture" type SessionUpdateParams = Parameters[0] type RequestPermissionParams = Parameters[0] @@ -192,97 +194,115 @@ function createFakeAgent() { describe("acp.agent event subscription", () => { test("routes message.part.updated by the event sessionID (no cross-session pollution)", async () => { - const { agent, controller, updates, stop } = createFakeAgent() - const cwd = "/tmp/opencode-acp-test" - - const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) - const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) - - controller.push({ - directory: cwd, - payload: { - type: "message.part.updated", - properties: { - part: { - sessionID: sessionB, - messageID: "msg_1", - type: "text", - synthetic: false, + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const { agent, controller, updates, stop } = createFakeAgent() + const cwd = "/tmp/opencode-acp-test" + + const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + + controller.push({ + directory: cwd, + payload: { + type: "message.part.updated", + properties: { + part: { + sessionID: sessionB, + messageID: "msg_1", + type: "text", + synthetic: false, + }, + delta: "hello", + }, }, - delta: "hello", - }, - }, - } as any) + } as any) - await new Promise((r) => setTimeout(r, 10)) + await new Promise((r) => setTimeout(r, 10)) - expect((updates.get(sessionA) ?? []).includes("agent_message_chunk")).toBe(false) - expect((updates.get(sessionB) ?? []).includes("agent_message_chunk")).toBe(true) + expect((updates.get(sessionA) ?? []).includes("agent_message_chunk")).toBe(false) + expect((updates.get(sessionB) ?? []).includes("agent_message_chunk")).toBe(true) - stop() + stop() + }, + }) }) test("keeps concurrent sessions isolated when message.part.updated events are interleaved", async () => { - const { agent, controller, chunks, stop } = createFakeAgent() - const cwd = "/tmp/opencode-acp-test" - - const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) - const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) - - const tokenA = ["ALPHA_", "111", "_X"] - const tokenB = ["BETA_", "222", "_Y"] - - const push = (sessionId: string, messageID: string, delta: string) => { - controller.push({ - directory: cwd, - payload: { - type: "message.part.updated", - properties: { - part: { - sessionID: sessionId, - messageID, - type: "text", - synthetic: false, + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const { agent, controller, chunks, stop } = createFakeAgent() + const cwd = "/tmp/opencode-acp-test" + + const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + + const tokenA = ["ALPHA_", "111", "_X"] + const tokenB = ["BETA_", "222", "_Y"] + + const push = (sessionId: string, messageID: string, delta: string) => { + controller.push({ + directory: cwd, + payload: { + type: "message.part.updated", + properties: { + part: { + sessionID: sessionId, + messageID, + type: "text", + synthetic: false, + }, + delta, + }, }, - delta, - }, - }, - } as any) - } + } as any) + } - push(sessionA, "msg_a", tokenA[0]) - push(sessionB, "msg_b", tokenB[0]) - push(sessionA, "msg_a", tokenA[1]) - push(sessionB, "msg_b", tokenB[1]) - push(sessionA, "msg_a", tokenA[2]) - push(sessionB, "msg_b", tokenB[2]) + push(sessionA, "msg_a", tokenA[0]) + push(sessionB, "msg_b", tokenB[0]) + push(sessionA, "msg_a", tokenA[1]) + push(sessionB, "msg_b", tokenB[1]) + push(sessionA, "msg_a", tokenA[2]) + push(sessionB, "msg_b", tokenB[2]) - await new Promise((r) => setTimeout(r, 20)) + await new Promise((r) => setTimeout(r, 20)) - const a = chunks.get(sessionA) ?? "" - const b = chunks.get(sessionB) ?? "" + const a = chunks.get(sessionA) ?? "" + const b = chunks.get(sessionB) ?? "" - expect(a).toContain(tokenA.join("")) - expect(b).toContain(tokenB.join("")) - for (const part of tokenB) expect(a).not.toContain(part) - for (const part of tokenA) expect(b).not.toContain(part) + expect(a).toContain(tokenA.join("")) + expect(b).toContain(tokenB.join("")) + for (const part of tokenB) expect(a).not.toContain(part) + for (const part of tokenA) expect(b).not.toContain(part) - stop() + stop() + }, + }) }) test("does not create additional event subscriptions on repeated loadSession()", async () => { - const { agent, calls, stop } = createFakeAgent() - const cwd = "/tmp/opencode-acp-test" + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const { agent, calls, stop } = createFakeAgent() + const cwd = "/tmp/opencode-acp-test" - const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) - await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) - await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) - await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) - await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) + await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) + await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) + await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) + await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) - expect(calls.eventSubscribe).toBe(1) + expect(calls.eventSubscribe).toBe(1) - stop() + stop() + }, + }) }) }) From 7088b42cb1a86aa33defc03f9b597f845948836f Mon Sep 17 00:00:00 2001 From: Lior Shkiller Date: Thu, 25 Dec 2025 22:19:29 +0200 Subject: [PATCH 4/6] fix(acp): preserve file attachment metadata during session replay - Preserve original filenames from URI in image blocks - Store resource blocks as file parts instead of flattening to text - Replay file parts as proper ACP image/resource blocks during session load - Convert text-based files (JSON, XML, etc.) to text parts for LLM compatibility - Keep binary files (images) as file parts for proper handling This fixes the issue where file attachments lost their metadata (filename, MIME type) when sessions were reloaded, causing them to appear as plain text instead of structured attachments. --- packages/opencode/src/acp/agent.ts | 86 ++++++++++++++++++++- packages/opencode/src/session/message-v2.ts | 29 ++++++- 2 files changed, 109 insertions(+), 6 deletions(-) diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index 19333fab58e..f0c82a03584 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -675,6 +675,62 @@ export namespace ACP { log.error("failed to send reasoning to ACP", { error: err }) }) } + } else if (part.type === "file") { + // Replay file attachments as ACP image or resource blocks + const url = part.url as string + const filename = part.filename as string + const mime = (part.mime || "application/octet-stream") as string + const isImage = mime.startsWith("image/") + + if (url.startsWith("data:")) { + // Extract base64 data from data URI + const base64Match = url.match(/^data:[^;]+;base64,(.*)$/) + const base64Data = base64Match ? base64Match[1] : "" + + if (isImage) { + // Send as ACP image block + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "user_message_chunk", + content: { + type: "image", + mimeType: mime, + data: base64Data, + uri: `file://${filename}`, + }, + }, + }) + .catch((err) => { + log.error("failed to send image to ACP", { error: err }) + }) + } else { + // Send as ACP resource block + // Decode base64 to text for text-based content + const text = typeof atob !== "undefined" + ? decodeURIComponent(escape(atob(base64Data))) + : Buffer.from(base64Data, "base64").toString("utf-8") + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "user_message_chunk", + content: { + type: "resource", + resource: { + uri: `file://${filename}`, + mimeType: mime, + text, + }, + }, + }, + }) + .catch((err) => { + log.error("failed to send resource to ACP", { error: err }) + }) + } + } } } } @@ -854,18 +910,21 @@ export namespace ACP { }) break case "image": + // Preserve original filename from URI if provided + const imageFilename = part.uri?.replace(/^file:\/\//, "").split("/").pop() || + `image.${part.mimeType.split("/")[1] || "png"}` if (part.data) { parts.push({ type: "file", url: `data:${part.mimeType};base64,${part.data}`, - filename: "image", + filename: imageFilename, mime: part.mimeType, }) } else if (part.uri && part.uri.startsWith("http:")) { parts.push({ type: "file", url: part.uri, - filename: "image", + filename: imageFilename, mime: part.mimeType, }) } @@ -878,11 +937,30 @@ export namespace ACP { break case "resource": + // Handle resources the same way as images - store as file parts + // This preserves metadata (filename, mimeType) for proper replay const resource = part.resource + const resourceFilename = resource.uri?.replace(/^file:\/\//, "").split("/").pop() || "file" + const resourceMime = resource.mimeType || "text/plain" + if ("text" in resource) { + // Convert text content to base64 data URL (same pattern as images) + const base64Text = typeof btoa !== "undefined" + ? btoa(unescape(encodeURIComponent(resource.text))) + : Buffer.from(resource.text, "utf-8").toString("base64") + parts.push({ + type: "file", + url: `data:${resourceMime};base64,${base64Text}`, + filename: resourceFilename, + mime: resourceMime, + }) + } else if ("blob" in resource) { + // Binary blob - already base64 parts.push({ - type: "text", - text: resource.text, + type: "file", + url: `data:${resourceMime};base64,${resource.blob}`, + filename: resourceFilename, + mime: resourceMime, }) } break diff --git a/packages/opencode/src/session/message-v2.ts b/packages/opencode/src/session/message-v2.ts index 14542669ee9..8246e3b297d 100644 --- a/packages/opencode/src/session/message-v2.ts +++ b/packages/opencode/src/session/message-v2.ts @@ -436,14 +436,39 @@ export namespace MessageV2 { type: "text", text: part.text, }) - // text/plain and directory files are converted into text parts, ignore them - if (part.type === "file" && part.mime !== "text/plain" && part.mime !== "application/x-directory") + // Text-based files are converted into text parts for LLM compatibility + // Only send images and other binary formats as file parts + const isTextBasedMime = + part.type === "file" && ( + part.mime.startsWith("text/") || + part.mime === "application/json" || + part.mime === "application/xml" || + part.mime === "application/javascript" || + part.mime === "application/typescript" || + part.mime === "application/x-directory" + ) + if (part.type === "file" && !isTextBasedMime) userMessage.parts.push({ type: "file", url: part.url, mediaType: part.mime, filename: part.filename, }) + // For text-based file parts, decode and send as text with filename header + if (part.type === "file" && isTextBasedMime && part.mime !== "application/x-directory") { + const url = part.url + if (url.startsWith("data:")) { + const base64Match = url.match(/^data:[^;]+;base64,(.*)$/) + if (base64Match) { + const base64Data = base64Match[1] + const text = Buffer.from(base64Data, "base64").toString("utf-8") + userMessage.parts.push({ + type: "text", + text: `[File: ${part.filename || "file"}]\n${text}`, + }) + } + } + } if (part.type === "compaction") { userMessage.parts.push({ From 37172a12d574c9b95cff5a2b6a79258452430ee4 Mon Sep 17 00:00:00 2001 From: Lior Shkiller Date: Mon, 29 Dec 2025 10:21:52 +0200 Subject: [PATCH 5/6] fix(acp): preserve file attachment metadata during session replay - Store ACP resources as file parts to preserve filename and mimeType - Add file part handling in processMessage for proper session replay - Decode text-based files for LLM compatibility in toModelMessage - Preserve original filename from image URI instead of hardcoding 'image' - Follow style guide: avoid else statements, use Buffer instead of atob/btoa --- packages/opencode/src/acp/agent.ts | 144 ++++++++++---------- packages/opencode/src/session/message-v2.ts | 42 +++--- 2 files changed, 95 insertions(+), 91 deletions(-) diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index f0c82a03584..bf20a3c446d 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -675,62 +675,61 @@ export namespace ACP { log.error("failed to send reasoning to ACP", { error: err }) }) } - } else if (part.type === "file") { + } + + if (part.type === "file") { // Replay file attachments as ACP image or resource blocks - const url = part.url as string - const filename = part.filename as string - const mime = (part.mime || "application/octet-stream") as string - const isImage = mime.startsWith("image/") - - if (url.startsWith("data:")) { - // Extract base64 data from data URI - const base64Match = url.match(/^data:[^;]+;base64,(.*)$/) - const base64Data = base64Match ? base64Match[1] : "" - - if (isImage) { - // Send as ACP image block - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "user_message_chunk", - content: { - type: "image", - mimeType: mime, - data: base64Data, - uri: `file://${filename}`, - }, - }, - }) - .catch((err) => { - log.error("failed to send image to ACP", { error: err }) - }) - } else { - // Send as ACP resource block - // Decode base64 to text for text-based content - const text = typeof atob !== "undefined" - ? decodeURIComponent(escape(atob(base64Data))) - : Buffer.from(base64Data, "base64").toString("utf-8") - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "user_message_chunk", - content: { - type: "resource", - resource: { - uri: `file://${filename}`, - mimeType: mime, - text, - }, - }, + const url = part.url + const filename = part.filename ?? "" + const mime = part.mime || "application/octet-stream" + + if (!url.startsWith("data:")) continue + + // Extract base64 data from data URI + const base64Match = url.match(/^data:[^;]+;base64,(.*)$/) + const base64Data = base64Match?.[1] ?? "" + + if (mime.startsWith("image/")) { + // Send as ACP image block + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "user_message_chunk", + content: { + type: "image", + mimeType: mime, + data: base64Data, + uri: `file://${filename}`, }, - }) - .catch((err) => { - log.error("failed to send resource to ACP", { error: err }) - }) - } + }, + }) + .catch((err) => { + log.error("failed to send image to ACP", { error: err }) + }) + continue } + + // Send as ACP resource block + const text = Buffer.from(base64Data, "base64").toString("utf-8") + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "user_message_chunk", + content: { + type: "resource", + resource: { + uri: `file://${filename}`, + mimeType: mime, + text, + }, + }, + }, + }) + .catch((err) => { + log.error("failed to send resource to ACP", { error: err }) + }) } } } @@ -909,7 +908,7 @@ export namespace ACP { text: part.text, }) break - case "image": + case "image": { // Preserve original filename from URI if provided const imageFilename = part.uri?.replace(/^file:\/\//, "").split("/").pop() || `image.${part.mimeType.split("/")[1] || "png"}` @@ -920,7 +919,9 @@ export namespace ACP { filename: imageFilename, mime: part.mimeType, }) - } else if (part.uri && part.uri.startsWith("http:")) { + break + } + if (part.uri?.startsWith("http:")) { parts.push({ type: "file", url: part.uri, @@ -929,6 +930,7 @@ export namespace ACP { }) } break + } case "resource_link": const parsed = parseUri(part.uri) @@ -936,34 +938,34 @@ export namespace ACP { break - case "resource": - // Handle resources the same way as images - store as file parts - // This preserves metadata (filename, mimeType) for proper replay + case "resource": { + // Store resources as file parts to preserve metadata (filename, mimeType) const resource = part.resource - const resourceFilename = resource.uri?.replace(/^file:\/\//, "").split("/").pop() || "file" - const resourceMime = resource.mimeType || "text/plain" + const filename = resource.uri?.replace(/^file:\/\//, "").split("/").pop() || "file" + const mime = resource.mimeType || "text/plain" if ("text" in resource) { - // Convert text content to base64 data URL (same pattern as images) - const base64Text = typeof btoa !== "undefined" - ? btoa(unescape(encodeURIComponent(resource.text))) - : Buffer.from(resource.text, "utf-8").toString("base64") + // Encode text to base64 data URL to preserve file metadata + const base64 = Buffer.from(resource.text, "utf-8").toString("base64") parts.push({ type: "file", - url: `data:${resourceMime};base64,${base64Text}`, - filename: resourceFilename, - mime: resourceMime, + url: `data:${mime};base64,${base64}`, + filename, + mime, }) - } else if ("blob" in resource) { + break + } + if ("blob" in resource) { // Binary blob - already base64 parts.push({ type: "file", - url: `data:${resourceMime};base64,${resource.blob}`, - filename: resourceFilename, - mime: resourceMime, + url: `data:${mime};base64,${resource.blob}`, + filename, + mime, }) } break + } default: break diff --git a/packages/opencode/src/session/message-v2.ts b/packages/opencode/src/session/message-v2.ts index 8246e3b297d..c41562559bc 100644 --- a/packages/opencode/src/session/message-v2.ts +++ b/packages/opencode/src/session/message-v2.ts @@ -436,38 +436,40 @@ export namespace MessageV2 { type: "text", text: part.text, }) - // Text-based files are converted into text parts for LLM compatibility - // Only send images and other binary formats as file parts - const isTextBasedMime = - part.type === "file" && ( + // Handle file parts based on MIME type + if (part.type === "file") { + const isTextBased = part.mime.startsWith("text/") || part.mime === "application/json" || part.mime === "application/xml" || part.mime === "application/javascript" || part.mime === "application/typescript" || part.mime === "application/x-directory" - ) - if (part.type === "file" && !isTextBasedMime) + + // Skip directory markers + if (part.mime === "application/x-directory") continue + + // Decode text-based files and send as text with filename header + if (isTextBased) { + const url = part.url + if (!url.startsWith("data:")) continue + const match = url.match(/^data:[^;]+;base64,(.*)$/) + if (!match) continue + const text = Buffer.from(match[1], "base64").toString("utf-8") + userMessage.parts.push({ + type: "text", + text: `[File: ${part.filename || "file"}]\n${text}`, + }) + continue + } + + // Send binary files (images, etc.) as file parts userMessage.parts.push({ type: "file", url: part.url, mediaType: part.mime, filename: part.filename, }) - // For text-based file parts, decode and send as text with filename header - if (part.type === "file" && isTextBasedMime && part.mime !== "application/x-directory") { - const url = part.url - if (url.startsWith("data:")) { - const base64Match = url.match(/^data:[^;]+;base64,(.*)$/) - if (base64Match) { - const base64Data = base64Match[1] - const text = Buffer.from(base64Data, "base64").toString("utf-8") - userMessage.parts.push({ - type: "text", - text: `[File: ${part.filename || "file"}]\n${text}`, - }) - } - } } if (part.type === "compaction") { From 7bfc8db2e67a3c799ae35f39c12a3a61fc870659 Mon Sep 17 00:00:00 2001 From: Lior Shkiller Date: Mon, 29 Dec 2025 10:25:55 +0200 Subject: [PATCH 6/6] fix: skip binary non-image files during ACP replay ACP resource blocks only support text content. Binary files like PDFs would be decoded as UTF-8 and produce garbage. Skip them during replay while still sending them to the LLM as file parts. --- packages/opencode/src/acp/agent.ts | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index bf20a3c446d..750d39bf408 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -710,6 +710,17 @@ export namespace ACP { continue } + // Check if this is a text-based file that can be decoded + const isTextBased = + mime.startsWith("text/") || + mime === "application/json" || + mime === "application/xml" || + mime === "application/javascript" || + mime === "application/typescript" + + // Skip binary non-image files (PDFs, etc.) - ACP resource blocks only support text + if (!isTextBased) continue + // Send as ACP resource block const text = Buffer.from(base64Data, "base64").toString("utf-8") await this.connection