From 5463e310669cbf0826f0e32acf6fcab394824835 Mon Sep 17 00:00:00 2001 From: noamzbr Date: Tue, 16 Dec 2025 15:53:44 +0200 Subject: [PATCH 1/4] fix(acp): use single global event subscription and route by sessionID Avoid per-session SSE subscriptions that caused cross-session event pollution and duplicate events on repeated loadSession(). ACP now maintains one global event stream and forwards updates to the correct ACP session using event payload sessionID. Adds ACP regression tests for cross-session isolation and subscription duplication. --- packages/opencode/src/acp/agent.ts | 537 +++++++++--------- packages/opencode/src/acp/session.ts | 4 + .../test/acp/event-subscription.test.ts | 290 ++++++++++ 3 files changed, 578 insertions(+), 253 deletions(-) create mode 100644 packages/opencode/test/acp/event-subscription.test.ts diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index d20c971ebc5..740e2679e1c 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -20,7 +20,7 @@ import { } from "@agentclientprotocol/sdk" import { Log } from "../util/log" import { ACPSessionManager } from "./session" -import type { ACPConfig, ACPSessionState } from "./types" +import type { ACPConfig } from "./types" import { Provider } from "../provider/provider" import { Installation } from "@/installation" import { MessageV2 } from "@/session/message-v2" @@ -28,7 +28,7 @@ import { Config } from "@/config/config" import { Todo } from "@/session/todo" import { z } from "zod" import { LoadAPIKeyError } from "ai" -import type { OpencodeClient, SessionMessageResponse } from "@opencode-ai/sdk/v2" +import type { Event, OpencodeClient, SessionMessageResponse } from "@opencode-ai/sdk/v2" export namespace ACP { const log = Log.create({ service: "acp-agent" }) @@ -49,285 +49,320 @@ export namespace ACP { private connection: AgentSideConnection private config: ACPConfig private sdk: OpencodeClient - private sessionManager + private sessionManager: ACPSessionManager + private eventAbort = new AbortController() + private eventStarted = false + private permissionOptions: PermissionOption[] = [ + { optionId: "once", kind: "allow_once", name: "Allow once" }, + { optionId: "always", kind: "allow_always", name: "Always allow" }, + { optionId: "reject", kind: "reject_once", name: "Reject" }, + ] constructor(connection: AgentSideConnection, config: ACPConfig) { this.connection = connection this.config = config this.sdk = config.sdk this.sessionManager = new ACPSessionManager(this.sdk) + this.startEventSubscription() } - private setupEventSubscriptions(session: ACPSessionState) { - const sessionId = session.id - const directory = session.cwd + private startEventSubscription() { + if (this.eventStarted) return + this.eventStarted = true + this.runEventSubscription().catch((error) => { + if (this.eventAbort.signal.aborted) return + log.error("event subscription failed", { error }) + }) + } - const options: PermissionOption[] = [ - { optionId: "once", kind: "allow_once", name: "Allow once" }, - { optionId: "always", kind: "allow_always", name: "Always allow" }, - { optionId: "reject", kind: "reject_once", name: "Reject" }, - ] - this.config.sdk.event.subscribe({ directory }).then(async (events) => { + private async runEventSubscription() { + while (true) { + if (this.eventAbort.signal.aborted) return + const events = await this.sdk.global.event({ + signal: this.eventAbort.signal, + }) for await (const event of events.stream) { - switch (event.type) { - case "permission.updated": - try { - const permission = event.properties - const res = await this.connection - .requestPermission({ + if (this.eventAbort.signal.aborted) return + const payload = (event as any)?.payload + if (!payload) continue + await this.handleEvent(payload as Event).catch((error) => { + log.error("failed to handle event", { error, type: payload.type }) + }) + } + } + } + + private async handleEvent(event: Event) { + switch (event.type) { + case "permission.updated": { + const permission = event.properties + const session = this.sessionManager.tryGet(permission.sessionID) + if (!session) return + const directory = session.cwd + + const res = await this.connection + .requestPermission({ + sessionId: permission.sessionID, + toolCall: { + toolCallId: permission.callID ?? permission.id, + status: "pending", + title: permission.title, + rawInput: permission.metadata, + kind: toToolKind(permission.type), + locations: toLocations(permission.type, permission.metadata), + }, + options: this.permissionOptions, + }) + .catch(async (error) => { + log.error("failed to request permission from ACP", { + error, + permissionID: permission.id, + sessionID: permission.sessionID, + }) + await this.sdk.permission.respond({ + sessionID: permission.sessionID, + permissionID: permission.id, + response: "reject", + directory, + }) + return undefined + }) + + if (!res) return + if (res.outcome.outcome !== "selected") { + await this.sdk.permission.respond({ + sessionID: permission.sessionID, + permissionID: permission.id, + response: "reject", + directory, + }) + return + } + + await this.sdk.permission.respond({ + sessionID: permission.sessionID, + permissionID: permission.id, + response: res.outcome.optionId as "once" | "always" | "reject", + directory, + }) + return + } + + case "message.part.updated": { + log.info("message part updated", { event: event.properties }) + const props = event.properties + const part = props.part + const session = this.sessionManager.tryGet(part.sessionID) + if (!session) return + const sessionId = session.id + const directory = session.cwd + + const message = await this.sdk.session + .message( + { + sessionID: part.sessionID, + messageID: part.messageID, + directory, + }, + { throwOnError: true }, + ) + .then((x) => x.data) + .catch((error) => { + log.error("unexpected error when fetching message", { error }) + return undefined + }) + + if (!message || message.info.role !== "assistant") return + + if (part.type === "tool") { + switch (part.state.status) { + case "pending": + await this.connection + .sessionUpdate({ sessionId, - toolCall: { - toolCallId: permission.callID ?? permission.id, + update: { + sessionUpdate: "tool_call", + toolCallId: part.callID, + title: part.tool, + kind: toToolKind(part.tool), status: "pending", - title: permission.title, - rawInput: permission.metadata, - kind: toToolKind(permission.type), - locations: toLocations(permission.type, permission.metadata), + locations: [], + rawInput: {}, }, - options, - }) - .catch(async (error) => { - log.error("failed to request permission from ACP", { - error, - permissionID: permission.id, - sessionID: permission.sessionID, - }) - await this.config.sdk.permission.respond({ - sessionID: permission.sessionID, - permissionID: permission.id, - response: "reject", - directory, - }) - return }) - if (!res) return - if (res.outcome.outcome !== "selected") { - await this.config.sdk.permission.respond({ - sessionID: permission.sessionID, - permissionID: permission.id, - response: "reject", - directory, + .catch((error) => { + log.error("failed to send tool pending to ACP", { error }) }) - return - } - await this.config.sdk.permission.respond({ - sessionID: permission.sessionID, - permissionID: permission.id, - response: res.outcome.optionId as "once" | "always" | "reject", - directory, - }) - } catch (err) { - log.error("unexpected error when handling permission", { error: err }) - } finally { - break - } + return - case "message.part.updated": - log.info("message part updated", { event: event.properties }) - try { - const props = event.properties - const { part } = props - - const message = await this.config.sdk.session - .message( - { - sessionID: part.sessionID, - messageID: part.messageID, - directory, + case "running": + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "in_progress", + locations: toLocations(part.tool, part.state.input), + rawInput: part.state.input, + }, + }) + .catch((error) => { + log.error("failed to send tool in_progress to ACP", { error }) + }) + return + + case "completed": { + const kind = toToolKind(part.tool) + const content: ToolCallContent[] = [ + { + type: "content", + content: { + type: "text", + text: part.state.output, }, - { throwOnError: true }, - ) - .then((x) => x.data) - .catch((err) => { - log.error("unexpected error when fetching message", { error: err }) - return undefined + }, + ] + + if (kind === "edit") { + const input = part.state.input + const filePath = typeof input["filePath"] === "string" ? input["filePath"] : "" + const oldText = typeof input["oldString"] === "string" ? input["oldString"] : "" + const newText = + typeof input["newString"] === "string" + ? input["newString"] + : typeof input["content"] === "string" + ? input["content"] + : "" + content.push({ + type: "diff", + path: filePath, + oldText, + newText, }) + } - if (!message || message.info.role !== "assistant") return - - if (part.type === "tool") { - switch (part.state.status) { - case "pending": - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "tool_call", - toolCallId: part.callID, - title: part.tool, - kind: toToolKind(part.tool), - status: "pending", - locations: [], - rawInput: {}, - }, - }) - .catch((err) => { - log.error("failed to send tool pending to ACP", { error: err }) - }) - break - case "running": - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "tool_call_update", - toolCallId: part.callID, - status: "in_progress", - locations: toLocations(part.tool, part.state.input), - rawInput: part.state.input, - }, - }) - .catch((err) => { - log.error("failed to send tool in_progress to ACP", { error: err }) - }) - break - case "completed": - const kind = toToolKind(part.tool) - const content: ToolCallContent[] = [ - { - type: "content", - content: { - type: "text", - text: part.state.output, - }, - }, - ] - - if (kind === "edit") { - const input = part.state.input - const filePath = typeof input["filePath"] === "string" ? input["filePath"] : "" - const oldText = typeof input["oldString"] === "string" ? input["oldString"] : "" - const newText = - typeof input["newString"] === "string" - ? input["newString"] - : typeof input["content"] === "string" - ? input["content"] - : "" - content.push({ - type: "diff", - path: filePath, - oldText, - newText, - }) - } - - if (part.tool === "todowrite") { - const parsedTodos = z.array(Todo.Info).safeParse(JSON.parse(part.state.output)) - if (parsedTodos.success) { - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "plan", - entries: parsedTodos.data.map((todo) => { - const status: PlanEntry["status"] = - todo.status === "cancelled" ? "completed" : (todo.status as PlanEntry["status"]) - return { - priority: "medium", - status, - content: todo.content, - } - }), - }, - }) - .catch((err) => { - log.error("failed to send session update for todo", { error: err }) - }) - } else { - log.error("failed to parse todo output", { error: parsedTodos.error }) - } - } - - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "tool_call_update", - toolCallId: part.callID, - status: "completed", - kind, - content, - title: part.state.title, - rawOutput: { - output: part.state.output, - metadata: part.state.metadata, - }, - }, - }) - .catch((err) => { - log.error("failed to send tool completed to ACP", { error: err }) - }) - break - case "error": - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "tool_call_update", - toolCallId: part.callID, - status: "failed", - content: [ - { - type: "content", - content: { - type: "text", - text: part.state.error, - }, - }, - ], - rawOutput: { - error: part.state.error, - }, - }, - }) - .catch((err) => { - log.error("failed to send tool error to ACP", { error: err }) - }) - break - } - } else if (part.type === "text") { - const delta = props.delta - if (delta && part.synthetic !== true) { + if (part.tool === "todowrite") { + const parsedTodos = z.array(Todo.Info).safeParse(JSON.parse(part.state.output)) + if (parsedTodos.success) { await this.connection .sessionUpdate({ sessionId, update: { - sessionUpdate: "agent_message_chunk", - content: { - type: "text", - text: delta, - }, + sessionUpdate: "plan", + entries: parsedTodos.data.map((todo) => { + const status: PlanEntry["status"] = + todo.status === "cancelled" ? "completed" : (todo.status as PlanEntry["status"]) + return { + priority: "medium", + status, + content: todo.content, + } + }), }, }) - .catch((err) => { - log.error("failed to send text to ACP", { error: err }) + .catch((error) => { + log.error("failed to send session update for todo", { error }) }) + } else { + log.error("failed to parse todo output", { error: parsedTodos.error }) } - } else if (part.type === "reasoning") { - const delta = props.delta - if (delta) { - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "agent_thought_chunk", + } + + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "completed", + kind, + content, + title: part.state.title, + rawOutput: { + output: part.state.output, + metadata: part.state.metadata, + }, + }, + }) + .catch((error) => { + log.error("failed to send tool completed to ACP", { error }) + }) + return + } + + case "error": + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId: part.callID, + status: "failed", + content: [ + { + type: "content", content: { type: "text", - text: delta, + text: part.state.error, }, }, - }) - .catch((err) => { - log.error("failed to send reasoning to ACP", { error: err }) - }) - } - } - } finally { - break - } + ], + rawOutput: { + error: part.state.error, + }, + }, + }) + .catch((error) => { + log.error("failed to send tool error to ACP", { error }) + }) + return + } + } + + if (part.type === "text") { + const delta = props.delta + if (delta && part.synthetic !== true) { + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "agent_message_chunk", + content: { + type: "text", + text: delta, + }, + }, + }) + .catch((error) => { + log.error("failed to send text to ACP", { error }) + }) + } + return } + + if (part.type === "reasoning") { + const delta = props.delta + if (delta) { + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "agent_thought_chunk", + content: { + type: "text", + text: delta, + }, + }, + }) + .catch((error) => { + log.error("failed to send reasoning to ACP", { error }) + }) + } + } + return } - }) + } } async initialize(params: InitializeRequest): Promise { @@ -392,8 +427,6 @@ export namespace ACP { sessionId, }) - this.setupEventSubscriptions(state) - return { sessionId, models: load.models, @@ -419,7 +452,7 @@ export namespace ACP { const model = await defaultModel(this.config, directory) // Store ACP session state - const state = await this.sessionManager.load(sessionId, params.cwd, params.mcpServers, model) + await this.sessionManager.load(sessionId, params.cwd, params.mcpServers, model) log.info("load_session", { sessionId, mcpServers: params.mcpServers.length }) @@ -429,8 +462,6 @@ export namespace ACP { sessionId, }) - this.setupEventSubscriptions(state) - // Replay session history const messages = await this.sdk.session .messages( diff --git a/packages/opencode/src/acp/session.ts b/packages/opencode/src/acp/session.ts index 70b65834705..151fa5646ba 100644 --- a/packages/opencode/src/acp/session.ts +++ b/packages/opencode/src/acp/session.ts @@ -13,6 +13,10 @@ export class ACPSessionManager { this.sdk = sdk } + tryGet(sessionId: string): ACPSessionState | undefined { + return this.sessions.get(sessionId) + } + async create(cwd: string, mcpServers: McpServer[], model?: ACPSessionState["model"]): Promise { const session = await this.sdk.session .create( diff --git a/packages/opencode/test/acp/event-subscription.test.ts b/packages/opencode/test/acp/event-subscription.test.ts new file mode 100644 index 00000000000..8b68547d13b --- /dev/null +++ b/packages/opencode/test/acp/event-subscription.test.ts @@ -0,0 +1,290 @@ +import { describe, expect, test } from "bun:test" +import { ACP } from "../../src/acp/agent" +import type { AgentSideConnection } from "@agentclientprotocol/sdk" +import type { Event } from "@opencode-ai/sdk/v2" + +type SessionUpdateParams = Parameters[0] +type RequestPermissionParams = Parameters[0] +type RequestPermissionResult = Awaited> + +type GlobalEventEnvelope = { + directory?: string + payload?: Event +} + +type EventController = { + push: (event: GlobalEventEnvelope) => void + close: () => void +} + +function createEventStream() { + const queue: GlobalEventEnvelope[] = [] + const waiters: Array<(value: GlobalEventEnvelope | undefined) => void> = [] + const state = { closed: false } + + const push = (event: GlobalEventEnvelope) => { + const waiter = waiters.shift() + if (waiter) { + waiter(event) + return + } + queue.push(event) + } + + const close = () => { + state.closed = true + for (const waiter of waiters.splice(0)) { + waiter(undefined) + } + } + + const stream = async function* (signal?: AbortSignal) { + while (true) { + if (signal?.aborted) return + const next = queue.shift() + if (next) { + yield next + continue + } + if (state.closed) return + const value = await new Promise((resolve) => { + waiters.push(resolve) + if (!signal) return + signal.addEventListener("abort", () => resolve(undefined), { once: true }) + }) + if (!value) return + yield value + } + } + + return { controller: { push, close } satisfies EventController, stream } +} + +function createFakeAgent() { + const updates = new Map() + const chunks = new Map() + const record = (sessionId: string, type: string) => { + const list = updates.get(sessionId) ?? [] + list.push(type) + updates.set(sessionId, list) + } + + const connection = { + async sessionUpdate(params: SessionUpdateParams) { + const update = params.update + const type = update?.sessionUpdate ?? "unknown" + record(params.sessionId, type) + if (update?.sessionUpdate === "agent_message_chunk") { + const content = update.content + if (content?.type !== "text") return + if (typeof content.text !== "string") return + chunks.set(params.sessionId, (chunks.get(params.sessionId) ?? "") + content.text) + } + }, + async requestPermission(_params: RequestPermissionParams): Promise { + return { outcome: { outcome: "selected", optionId: "once" } } as RequestPermissionResult + }, + } as unknown as AgentSideConnection + + const { controller, stream } = createEventStream() + const calls = { + eventSubscribe: 0, + sessionCreate: 0, + } + + const sdk = { + global: { + event: async (opts?: { signal?: AbortSignal }) => { + calls.eventSubscribe++ + return { stream: stream(opts?.signal) } + }, + }, + session: { + create: async (_params?: any) => { + calls.sessionCreate++ + return { + data: { + id: `ses_${calls.sessionCreate}`, + time: { created: new Date().toISOString() }, + }, + } + }, + get: async (_params?: any) => { + return { + data: { + id: "ses_1", + time: { created: new Date().toISOString() }, + }, + } + }, + messages: async () => { + return { data: [] } + }, + message: async () => { + return { + data: { + info: { + role: "assistant", + }, + }, + } + }, + }, + permission: { + respond: async () => { + return { data: true } + }, + }, + config: { + providers: async () => { + return { + data: { + providers: [ + { + id: "opencode", + name: "opencode", + models: { + "big-pickle": { id: "big-pickle", name: "big-pickle" }, + }, + }, + ], + }, + } + }, + }, + app: { + agents: async () => { + return { + data: [ + { + name: "build", + description: "build", + mode: "agent", + }, + ], + } + }, + }, + command: { + list: async () => { + return { data: [] } + }, + }, + mcp: { + add: async () => { + return { data: true } + }, + }, + } as any + + const agent = new ACP.Agent(connection, { + sdk, + defaultModel: { providerID: "opencode", modelID: "big-pickle" }, + } as any) + + const stop = () => { + controller.close() + ;(agent as any).eventAbort.abort() + } + + return { agent, controller, calls, updates, chunks, stop } +} + +describe("acp.agent event subscription", () => { + test("routes message.part.updated by the event sessionID (no cross-session pollution)", async () => { + const { agent, controller, updates, stop } = createFakeAgent() + const cwd = "/tmp/opencode-acp-test" + + const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + + controller.push({ + directory: cwd, + payload: { + type: "message.part.updated", + properties: { + part: { + sessionID: sessionB, + messageID: "msg_1", + type: "text", + synthetic: false, + }, + delta: "hello", + }, + }, + } as any) + + await new Promise((r) => setTimeout(r, 10)) + + expect((updates.get(sessionA) ?? []).includes("agent_message_chunk")).toBe(false) + expect((updates.get(sessionB) ?? []).includes("agent_message_chunk")).toBe(true) + + stop() + }) + + test("keeps concurrent sessions isolated when message.part.updated events are interleaved", async () => { + const { agent, controller, chunks, stop } = createFakeAgent() + const cwd = "/tmp/opencode-acp-test" + + const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + + const tokenA = ["ALPHA_", "111", "_X"] + const tokenB = ["BETA_", "222", "_Y"] + + const push = (sessionId: string, messageID: string, delta: string) => { + controller.push({ + directory: cwd, + payload: { + type: "message.part.updated", + properties: { + part: { + sessionID: sessionId, + messageID, + type: "text", + synthetic: false, + }, + delta, + }, + }, + } as any) + } + + push(sessionA, "msg_a", tokenA[0]) + push(sessionB, "msg_b", tokenB[0]) + push(sessionA, "msg_a", tokenA[1]) + push(sessionB, "msg_b", tokenB[1]) + push(sessionA, "msg_a", tokenA[2]) + push(sessionB, "msg_b", tokenB[2]) + + await new Promise((r) => setTimeout(r, 20)) + + const a = chunks.get(sessionA) ?? "" + const b = chunks.get(sessionB) ?? "" + + expect(a).toContain(tokenA.join("")) + expect(b).toContain(tokenB.join("")) + for (const part of tokenB) expect(a).not.toContain(part) + for (const part of tokenA) expect(b).not.toContain(part) + + stop() + }) + + test("does not create additional event subscriptions on repeated loadSession()", async () => { + const { agent, calls, stop } = createFakeAgent() + const cwd = "/tmp/opencode-acp-test" + + const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + + await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) + await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) + await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) + await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) + + expect(calls.eventSubscribe).toBe(1) + + stop() + }) +}) + + From c00a159466af4e7dc2b98576327ad9978f777892 Mon Sep 17 00:00:00 2001 From: noamzbr Date: Tue, 16 Dec 2025 16:09:10 +0200 Subject: [PATCH 2/4] format fix --- packages/opencode/test/acp/event-subscription.test.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/opencode/test/acp/event-subscription.test.ts b/packages/opencode/test/acp/event-subscription.test.ts index 8b68547d13b..71cba58b509 100644 --- a/packages/opencode/test/acp/event-subscription.test.ts +++ b/packages/opencode/test/acp/event-subscription.test.ts @@ -286,5 +286,3 @@ describe("acp.agent event subscription", () => { stop() }) }) - - From 33c00403a063d9e3e28033b87f348b895de5c531 Mon Sep 17 00:00:00 2001 From: noamzbr Date: Mon, 22 Dec 2025 17:03:41 +0200 Subject: [PATCH 3/4] update tests --- .../test/acp/event-subscription.test.ts | 164 ++++++++++-------- 1 file changed, 92 insertions(+), 72 deletions(-) diff --git a/packages/opencode/test/acp/event-subscription.test.ts b/packages/opencode/test/acp/event-subscription.test.ts index 71cba58b509..18bc1013586 100644 --- a/packages/opencode/test/acp/event-subscription.test.ts +++ b/packages/opencode/test/acp/event-subscription.test.ts @@ -2,6 +2,8 @@ import { describe, expect, test } from "bun:test" import { ACP } from "../../src/acp/agent" import type { AgentSideConnection } from "@agentclientprotocol/sdk" import type { Event } from "@opencode-ai/sdk/v2" +import { Instance } from "../../src/project/instance" +import { tmpdir } from "../fixture/fixture" type SessionUpdateParams = Parameters[0] type RequestPermissionParams = Parameters[0] @@ -192,97 +194,115 @@ function createFakeAgent() { describe("acp.agent event subscription", () => { test("routes message.part.updated by the event sessionID (no cross-session pollution)", async () => { - const { agent, controller, updates, stop } = createFakeAgent() - const cwd = "/tmp/opencode-acp-test" - - const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) - const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) - - controller.push({ - directory: cwd, - payload: { - type: "message.part.updated", - properties: { - part: { - sessionID: sessionB, - messageID: "msg_1", - type: "text", - synthetic: false, + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const { agent, controller, updates, stop } = createFakeAgent() + const cwd = "/tmp/opencode-acp-test" + + const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + + controller.push({ + directory: cwd, + payload: { + type: "message.part.updated", + properties: { + part: { + sessionID: sessionB, + messageID: "msg_1", + type: "text", + synthetic: false, + }, + delta: "hello", + }, }, - delta: "hello", - }, - }, - } as any) + } as any) - await new Promise((r) => setTimeout(r, 10)) + await new Promise((r) => setTimeout(r, 10)) - expect((updates.get(sessionA) ?? []).includes("agent_message_chunk")).toBe(false) - expect((updates.get(sessionB) ?? []).includes("agent_message_chunk")).toBe(true) + expect((updates.get(sessionA) ?? []).includes("agent_message_chunk")).toBe(false) + expect((updates.get(sessionB) ?? []).includes("agent_message_chunk")).toBe(true) - stop() + stop() + }, + }) }) test("keeps concurrent sessions isolated when message.part.updated events are interleaved", async () => { - const { agent, controller, chunks, stop } = createFakeAgent() - const cwd = "/tmp/opencode-acp-test" - - const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) - const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) - - const tokenA = ["ALPHA_", "111", "_X"] - const tokenB = ["BETA_", "222", "_Y"] - - const push = (sessionId: string, messageID: string, delta: string) => { - controller.push({ - directory: cwd, - payload: { - type: "message.part.updated", - properties: { - part: { - sessionID: sessionId, - messageID, - type: "text", - synthetic: false, + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const { agent, controller, chunks, stop } = createFakeAgent() + const cwd = "/tmp/opencode-acp-test" + + const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + + const tokenA = ["ALPHA_", "111", "_X"] + const tokenB = ["BETA_", "222", "_Y"] + + const push = (sessionId: string, messageID: string, delta: string) => { + controller.push({ + directory: cwd, + payload: { + type: "message.part.updated", + properties: { + part: { + sessionID: sessionId, + messageID, + type: "text", + synthetic: false, + }, + delta, + }, }, - delta, - }, - }, - } as any) - } + } as any) + } - push(sessionA, "msg_a", tokenA[0]) - push(sessionB, "msg_b", tokenB[0]) - push(sessionA, "msg_a", tokenA[1]) - push(sessionB, "msg_b", tokenB[1]) - push(sessionA, "msg_a", tokenA[2]) - push(sessionB, "msg_b", tokenB[2]) + push(sessionA, "msg_a", tokenA[0]) + push(sessionB, "msg_b", tokenB[0]) + push(sessionA, "msg_a", tokenA[1]) + push(sessionB, "msg_b", tokenB[1]) + push(sessionA, "msg_a", tokenA[2]) + push(sessionB, "msg_b", tokenB[2]) - await new Promise((r) => setTimeout(r, 20)) + await new Promise((r) => setTimeout(r, 20)) - const a = chunks.get(sessionA) ?? "" - const b = chunks.get(sessionB) ?? "" + const a = chunks.get(sessionA) ?? "" + const b = chunks.get(sessionB) ?? "" - expect(a).toContain(tokenA.join("")) - expect(b).toContain(tokenB.join("")) - for (const part of tokenB) expect(a).not.toContain(part) - for (const part of tokenA) expect(b).not.toContain(part) + expect(a).toContain(tokenA.join("")) + expect(b).toContain(tokenB.join("")) + for (const part of tokenB) expect(a).not.toContain(part) + for (const part of tokenA) expect(b).not.toContain(part) - stop() + stop() + }, + }) }) test("does not create additional event subscriptions on repeated loadSession()", async () => { - const { agent, calls, stop } = createFakeAgent() - const cwd = "/tmp/opencode-acp-test" + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const { agent, calls, stop } = createFakeAgent() + const cwd = "/tmp/opencode-acp-test" - const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) - await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) - await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) - await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) - await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) + await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) + await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) + await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) + await agent.loadSession({ sessionId, cwd, mcpServers: [] } as any) - expect(calls.eventSubscribe).toBe(1) + expect(calls.eventSubscribe).toBe(1) - stop() + stop() + }, + }) }) }) From 2e1f4f7a4d5a7993d12f4d24cee7393834e07f86 Mon Sep 17 00:00:00 2001 From: noam-v Date: Sat, 17 Jan 2026 14:42:20 +0200 Subject: [PATCH 4/4] don't block global event loop on permission prompts Use per-session promise queue for permission handling so one session's pending prompt doesn't stall events for other sessions. --- packages/opencode/src/acp/agent.ts | 123 +++++++++-------- .../test/acp/event-subscription.test.ts | 130 +++++++++++++++++- 2 files changed, 198 insertions(+), 55 deletions(-) diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index abda475638c..26f6e5fefa5 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -50,6 +50,7 @@ export namespace ACP { private sessionManager: ACPSessionManager private eventAbort = new AbortController() private eventStarted = false + private permissionQueues = new Map>() private permissionOptions: PermissionOption[] = [ { optionId: "once", kind: "allow_once", name: "Allow once" }, { optionId: "always", kind: "allow_always", name: "Always allow" }, @@ -96,67 +97,81 @@ export namespace ACP { const permission = event.properties const session = this.sessionManager.tryGet(permission.sessionID) if (!session) return - const directory = session.cwd - const res = await this.connection - .requestPermission({ - sessionId: permission.sessionID, - toolCall: { - toolCallId: permission.tool?.callID ?? permission.id, - status: "pending", - title: permission.permission, - rawInput: permission.metadata, - kind: toToolKind(permission.permission), - locations: toLocations(permission.permission, permission.metadata), - }, - options: this.permissionOptions, - }) - .catch(async (error) => { - log.error("failed to request permission from ACP", { - error, - permissionID: permission.id, - sessionID: permission.sessionID, - }) + const prev = this.permissionQueues.get(permission.sessionID) ?? Promise.resolve() + const next = prev + .then(async () => { + const directory = session.cwd + + const res = await this.connection + .requestPermission({ + sessionId: permission.sessionID, + toolCall: { + toolCallId: permission.tool?.callID ?? permission.id, + status: "pending", + title: permission.permission, + rawInput: permission.metadata, + kind: toToolKind(permission.permission), + locations: toLocations(permission.permission, permission.metadata), + }, + options: this.permissionOptions, + }) + .catch(async (error) => { + log.error("failed to request permission from ACP", { + error, + permissionID: permission.id, + sessionID: permission.sessionID, + }) + await this.sdk.permission.reply({ + requestID: permission.id, + reply: "reject", + directory, + }) + return undefined + }) + + if (!res) return + if (res.outcome.outcome !== "selected") { + await this.sdk.permission.reply({ + requestID: permission.id, + reply: "reject", + directory, + }) + return + } + + if (res.outcome.optionId !== "reject" && permission.permission == "edit") { + const metadata = permission.metadata || {} + const filepath = typeof metadata["filepath"] === "string" ? metadata["filepath"] : "" + const diff = typeof metadata["diff"] === "string" ? metadata["diff"] : "" + + const content = await Bun.file(filepath).text() + const newContent = getNewContent(content, diff) + + if (newContent) { + this.connection.writeTextFile({ + sessionId: session.id, + path: filepath, + content: newContent, + }) + } + } + await this.sdk.permission.reply({ requestID: permission.id, - reply: "reject", + reply: res.outcome.optionId as "once" | "always" | "reject", directory, }) - return undefined }) - - if (!res) return - if (res.outcome.outcome !== "selected") { - await this.sdk.permission.reply({ - requestID: permission.id, - reply: "reject", - directory, + .catch((error) => { + log.error("failed to handle permission", { error, permissionID: permission.id }) }) - return - } - - if (res.outcome.optionId !== "reject" && permission.permission == "edit") { - const metadata = permission.metadata || {} - const filepath = typeof metadata["filepath"] === "string" ? metadata["filepath"] : "" - const diff = typeof metadata["diff"] === "string" ? metadata["diff"] : "" - - const content = await Bun.file(filepath).text() - const newContent = getNewContent(content, diff) - - if (newContent) { - this.connection.writeTextFile({ - sessionId: session.id, - path: filepath, - content: newContent, - }) - } - } - - await this.sdk.permission.reply({ - requestID: permission.id, - reply: res.outcome.optionId as "once" | "always" | "reject", - directory, - }) + .finally(() => { + if (this.permissionQueues.get(permission.sessionID) === next) { + this.permissionQueues.delete(permission.sessionID) + } + }) + this.permissionQueues.set(permission.sessionID, next) return } diff --git a/packages/opencode/test/acp/event-subscription.test.ts b/packages/opencode/test/acp/event-subscription.test.ts index 18bc1013586..8e139ff5973 100644 --- a/packages/opencode/test/acp/event-subscription.test.ts +++ b/packages/opencode/test/acp/event-subscription.test.ts @@ -189,7 +189,7 @@ function createFakeAgent() { ;(agent as any).eventAbort.abort() } - return { agent, controller, calls, updates, chunks, stop } + return { agent, controller, calls, updates, chunks, stop, sdk, connection } } describe("acp.agent event subscription", () => { @@ -305,4 +305,132 @@ describe("acp.agent event subscription", () => { }, }) }) + + test("permission.asked events are handled and replied", async () => { + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const permissionReplies: string[] = [] + const { agent, controller, stop, sdk } = createFakeAgent() + sdk.permission.reply = async (params: any) => { + permissionReplies.push(params.requestID) + return { data: true } + } + const cwd = "/tmp/opencode-acp-test" + + const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + + controller.push({ + directory: cwd, + payload: { + type: "permission.asked", + properties: { + id: "perm_1", + sessionID: sessionA, + permission: "bash", + patterns: ["*"], + metadata: {}, + always: [], + }, + }, + } as any) + + await new Promise((r) => setTimeout(r, 20)) + + expect(permissionReplies).toContain("perm_1") + + stop() + }, + }) + }) + + test("permission prompt on session A does not block message updates for session B", async () => { + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const permissionReplies: string[] = [] + let resolvePermissionA: (() => void) | undefined + const permissionABlocking = new Promise((r) => { + resolvePermissionA = r + }) + + const { agent, controller, chunks, stop, sdk, connection } = createFakeAgent() + + // Make permission request for session A block until we release it + const originalRequestPermission = connection.requestPermission.bind(connection) + let permissionCalls = 0 + connection.requestPermission = async (params: RequestPermissionParams) => { + permissionCalls++ + if (params.sessionId.endsWith("1")) { + await permissionABlocking + } + return originalRequestPermission(params) + } + + sdk.permission.reply = async (params: any) => { + permissionReplies.push(params.requestID) + return { data: true } + } + + const cwd = "/tmp/opencode-acp-test" + + const sessionA = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + const sessionB = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + + // Push permission.asked for session A (will block) + controller.push({ + directory: cwd, + payload: { + type: "permission.asked", + properties: { + id: "perm_a", + sessionID: sessionA, + permission: "bash", + patterns: ["*"], + metadata: {}, + always: [], + }, + }, + } as any) + + // Give time for permission handling to start + await new Promise((r) => setTimeout(r, 10)) + + // Push message for session B while A's permission is pending + controller.push({ + directory: cwd, + payload: { + type: "message.part.updated", + properties: { + part: { + sessionID: sessionB, + messageID: "msg_b", + type: "text", + synthetic: false, + }, + delta: "session_b_message", + }, + }, + } as any) + + // Wait for session B's message to be processed + await new Promise((r) => setTimeout(r, 20)) + + // Session B should have received message even though A's permission is still pending + expect(chunks.get(sessionB) ?? "").toContain("session_b_message") + expect(permissionReplies).not.toContain("perm_a") + + // Release session A's permission + resolvePermissionA!() + await new Promise((r) => setTimeout(r, 20)) + + // Now session A's permission should be replied + expect(permissionReplies).toContain("perm_a") + + stop() + }, + }) + }) })