diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index 8b338f1b571..b065623d52d 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -137,6 +137,7 @@ export namespace ACP { private eventStarted = false private bashSnapshots = new Map() private toolStarts = new Set() + private textSnapshots = new Map() private permissionQueues = new Map>() private permissionOptions: PermissionOption[] = [ { optionId: "once", kind: "allow_once", name: "Allow once" }, @@ -446,6 +447,18 @@ export namespace ACP { return } } + + if (part.type === "text" && part.ignored !== true) { + const delta = this.textDelta(part.id, part.text) + if (delta) await this.sendTextChunk(sessionId, "agent_message_chunk", delta) + return + } + + if (part.type === "reasoning") { + const delta = this.textDelta(part.id, part.text) + if (delta) await this.sendTextChunk(sessionId, "agent_thought_chunk", delta) + return + } return } @@ -476,38 +489,14 @@ export namespace ACP { if (!part) return if (part.type === "text" && props.field === "text" && part.ignored !== true) { - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "agent_message_chunk", - content: { - type: "text", - text: props.delta, - }, - }, - }) - .catch((error) => { - log.error("failed to send text delta to ACP", { error }) - }) + this.textSnapshots.set(part.id, (this.textSnapshots.get(part.id) ?? "") + props.delta) + await this.sendTextChunk(sessionId, "agent_message_chunk", props.delta) return } if (part.type === "reasoning" && props.field === "text") { - await this.connection - .sessionUpdate({ - sessionId, - update: { - sessionUpdate: "agent_thought_chunk", - content: { - type: "text", - text: props.delta, - }, - }, - }) - .catch((error) => { - log.error("failed to send reasoning delta to ACP", { error }) - }) + this.textSnapshots.set(part.id, (this.textSnapshots.get(part.id) ?? "") + props.delta) + await this.sendTextChunk(sessionId, "agent_thought_chunk", props.delta) } return } @@ -1085,6 +1074,31 @@ export namespace ACP { return output } + private textDelta(id: string, next: string) { + const prev = this.textSnapshots.get(id) ?? "" + this.textSnapshots.set(id, next) + if (!next) return "" + if (next.startsWith(prev)) return next.slice(prev.length) + // Fallback: if text was edited mid-stream (rare), send full text to ensure client gets complete content + return next + } + + private async sendTextChunk( + sessionId: string, + update: "agent_message_chunk" | "agent_thought_chunk", + text: string, + ) { + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: update, + content: { type: "text", text }, + }, + }) + .catch((error) => log.error(`failed to send ${update} to ACP`, { error })) + } + private async toolStart(sessionId: string, part: ToolPart) { if (this.toolStarts.has(part.callID)) return this.toolStarts.add(part.callID) diff --git a/packages/opencode/test/acp/event-subscription.test.ts b/packages/opencode/test/acp/event-subscription.test.ts index 1abf578281d..e25b49ab65a 100644 --- a/packages/opencode/test/acp/event-subscription.test.ts +++ b/packages/opencode/test/acp/event-subscription.test.ts @@ -1,8 +1,8 @@ import { describe, expect, test } from "bun:test" -import { ACP } from "../../src/acp/agent" +import { ACP } from "@/acp/agent.ts" import type { AgentSideConnection } from "@agentclientprotocol/sdk" import type { Event, EventMessagePartUpdated, ToolStatePending, ToolStateRunning } from "@opencode-ai/sdk/v2" -import { Instance } from "../../src/project/instance" +import { Instance } from "@/project/instance.ts" import { tmpdir } from "../fixture/fixture" type SessionUpdateParams = Parameters[0] @@ -136,7 +136,6 @@ function createFakeAgent() { if (update?.sessionUpdate === "agent_message_chunk") { const content = update.content if (content?.type !== "text") return - if (typeof content.text !== "string") return chunks.set(params.sessionId, (chunks.get(params.sessionId) ?? "") + content.text) } }, @@ -345,6 +344,279 @@ describe("acp.agent event subscription", () => { }) }) + test("emits text chunks from message.part.updated when delta events are absent", async () => { + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const { agent, controller, chunks, stop } = createFakeAgent() + const cwd = "/tmp/opencode-acp-test" + + const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + + controller.push({ + directory: cwd, + payload: { + type: "message.part.updated", + properties: { + part: { + id: "part_1", + sessionID: sessionId, + messageID: "msg_1", + type: "text", + text: "hello", + }, + }, + }, + } as any) + + controller.push({ + directory: cwd, + payload: { + type: "message.part.updated", + properties: { + part: { + id: "part_1", + sessionID: sessionId, + messageID: "msg_1", + type: "text", + text: "hello world", + }, + }, + }, + } as any) + + await new Promise((r) => setTimeout(r, 20)) + + expect(chunks.get(sessionId)).toBe("hello world") + stop() + }, + }) + }) + + test("emits reasoning chunks from message.part.updated when delta events are absent", async () => { + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const thoughtChunks = new Map() + const { agent, controller, sessionUpdates, stop } = createFakeAgent() + const cwd = "/tmp/opencode-acp-test" + + // Track thought chunks + const originalSessionUpdate = sessionUpdates.push.bind(sessionUpdates) + const connection = (agent as any).connection + connection.sessionUpdate = async (params: SessionUpdateParams) => { + if (params.update?.sessionUpdate === "agent_thought_chunk") { + const content = params.update.content + if (content?.type === "text") { + thoughtChunks.set(params.sessionId, (thoughtChunks.get(params.sessionId) ?? "") + content.text) + } + } + return originalSessionUpdate(params) + } + + const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + + controller.push({ + directory: cwd, + payload: { + type: "message.part.updated", + properties: { + part: { + id: "reasoning_1", + sessionID: sessionId, + messageID: "msg_1", + type: "reasoning", + text: "thinking...", + time: { start: Date.now() }, + }, + }, + }, + } as any) + + controller.push({ + directory: cwd, + payload: { + type: "message.part.updated", + properties: { + part: { + id: "reasoning_1", + sessionID: sessionId, + messageID: "msg_1", + type: "reasoning", + text: "thinking... deeply", + time: { start: Date.now() }, + }, + }, + }, + } as any) + + await new Promise((r) => setTimeout(r, 20)) + + expect(thoughtChunks.get(sessionId)).toBe("thinking... deeply") + stop() + }, + }) + }) + + test("does not duplicate chunks when message.part.delta is followed by message.part.updated", async () => { + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const { agent, controller, chunks, stop } = createFakeAgent() + const cwd = "/tmp/opencode-acp-test" + + const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + + controller.push({ + directory: cwd, + payload: { + type: "message.part.delta", + properties: { + sessionID: sessionId, + messageID: "msg_1", + partID: "msg_1_part", + field: "text", + delta: "hello", + }, + }, + } as any) + + controller.push({ + directory: cwd, + payload: { + type: "message.part.updated", + properties: { + part: { + id: "msg_1_part", + sessionID: sessionId, + messageID: "msg_1", + type: "text", + text: "hello", + }, + }, + }, + } as any) + + controller.push({ + directory: cwd, + payload: { + type: "message.part.updated", + properties: { + part: { + id: "msg_1_part", + sessionID: sessionId, + messageID: "msg_1", + type: "text", + text: "hello world", + }, + }, + }, + } as any) + + await new Promise((r) => setTimeout(r, 20)) + + expect(chunks.get(sessionId)).toBe("hello world") + stop() + }, + }) + }) + + test("does not duplicate reasoning chunks when delta is followed by updated", async () => { + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const thoughtChunks = new Map() + const { agent, controller, stop, sdk } = createFakeAgent() + const cwd = "/tmp/opencode-acp-test" + + // Override message fetch to return reasoning part + sdk.session.message = async (params?: any) => ({ + data: { + info: { role: "assistant" }, + parts: [ + { + id: params?.messageID ? `${params.messageID}_part` : "part_1", + type: "reasoning", + text: "", + time: { start: Date.now() }, + }, + ], + }, + }) + + // Track thought chunks + const connection = (agent as any).connection + connection.sessionUpdate = async (params: SessionUpdateParams) => { + if (params.update?.sessionUpdate === "agent_thought_chunk") { + const content = params.update.content + if (content?.type === "text") { + thoughtChunks.set(params.sessionId, (thoughtChunks.get(params.sessionId) ?? "") + content.text) + } + } + } + + const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + + controller.push({ + directory: cwd, + payload: { + type: "message.part.delta", + properties: { + sessionID: sessionId, + messageID: "msg_1", + partID: "msg_1_part", + field: "text", + delta: "thinking", + }, + }, + } as any) + + controller.push({ + directory: cwd, + payload: { + type: "message.part.updated", + properties: { + part: { + id: "msg_1_part", + sessionID: sessionId, + messageID: "msg_1", + type: "reasoning", + text: "thinking", + time: { start: Date.now() }, + }, + }, + }, + } as any) + + controller.push({ + directory: cwd, + payload: { + type: "message.part.updated", + properties: { + part: { + id: "msg_1_part", + sessionID: sessionId, + messageID: "msg_1", + type: "reasoning", + text: "thinking deeply", + time: { start: Date.now() }, + }, + }, + }, + } as any) + + await new Promise((r) => setTimeout(r, 20)) + + expect(thoughtChunks.get(sessionId)).toBe("thinking deeply") + stop() + }, + }) + }) + test("does not create additional event subscriptions on repeated loadSession()", async () => { await using tmp = await tmpdir() await Instance.provide({