diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index 8b338f1b571..479621c1dde 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -137,6 +137,7 @@ export namespace ACP { private eventStarted = false private bashSnapshots = new Map() private toolStarts = new Set() + private textSnapshots = new Map() private permissionQueues = new Map>() private permissionOptions: PermissionOption[] = [ { optionId: "once", kind: "allow_once", name: "Allow once" }, @@ -446,6 +447,46 @@ export namespace ACP { return } } + + if (part.type === "text" && part.ignored !== true) { + const delta = this.textDelta(part.id, part.text) + if (!delta) return + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "agent_message_chunk", + content: { + type: "text", + text: delta, + }, + }, + }) + .catch((error) => { + log.error("failed to send text update to ACP", { error }) + }) + return + } + + if (part.type === "reasoning") { + const delta = this.textDelta(part.id, part.text) + if (!delta) return + await this.connection + .sessionUpdate({ + sessionId, + update: { + sessionUpdate: "agent_thought_chunk", + content: { + type: "text", + text: delta, + }, + }, + }) + .catch((error) => { + log.error("failed to send reasoning update to ACP", { error }) + }) + return + } return } @@ -476,6 +517,7 @@ export namespace ACP { if (!part) return if (part.type === "text" && props.field === "text" && part.ignored !== true) { + this.textSnapshots.set(part.id, part.text) await this.connection .sessionUpdate({ sessionId, @@ -494,6 +536,7 @@ export namespace ACP { } if (part.type === "reasoning" && props.field === "text") { + this.textSnapshots.set(part.id, part.text) await this.connection .sessionUpdate({ sessionId, @@ -1085,6 +1128,14 @@ export namespace ACP { return output } + private textDelta(id: string, next: string) { + const prev = this.textSnapshots.get(id) ?? "" + this.textSnapshots.set(id, next) + if (!next) return "" + if (next.startsWith(prev)) return next.slice(prev.length) + return next + } + private async toolStart(sessionId: string, part: ToolPart) { if (this.toolStarts.has(part.callID)) return this.toolStarts.add(part.callID) diff --git a/packages/opencode/test/acp/event-subscription.test.ts b/packages/opencode/test/acp/event-subscription.test.ts index 1abf578281d..9ac40e9c196 100644 --- a/packages/opencode/test/acp/event-subscription.test.ts +++ b/packages/opencode/test/acp/event-subscription.test.ts @@ -345,6 +345,121 @@ describe("acp.agent event subscription", () => { }) }) + + test("emits text chunks from message.part.updated when delta events are absent", async () => { + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const { agent, controller, chunks, stop } = createFakeAgent() + const cwd = "/tmp/opencode-acp-test" + + const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + + controller.push({ + directory: cwd, + payload: { + type: "message.part.updated", + properties: { + part: { + id: "part_1", + sessionID: sessionId, + messageID: "msg_1", + type: "text", + text: "hello", + }, + }, + }, + } as any) + + controller.push({ + directory: cwd, + payload: { + type: "message.part.updated", + properties: { + part: { + id: "part_1", + sessionID: sessionId, + messageID: "msg_1", + type: "text", + text: "hello world", + }, + }, + }, + } as any) + + await new Promise((r) => setTimeout(r, 20)) + + expect(chunks.get(sessionId)).toBe("hello world") + stop() + }, + }) + }) + + test("does not duplicate chunks when message.part.delta is followed by message.part.updated", async () => { + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const { agent, controller, chunks, stop } = createFakeAgent() + const cwd = "/tmp/opencode-acp-test" + + const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + + controller.push({ + directory: cwd, + payload: { + type: "message.part.delta", + properties: { + sessionID: sessionId, + messageID: "msg_1", + partID: "msg_1_part", + field: "text", + delta: "hello", + }, + }, + } as any) + + controller.push({ + directory: cwd, + payload: { + type: "message.part.updated", + properties: { + part: { + id: "msg_1_part", + sessionID: sessionId, + messageID: "msg_1", + type: "text", + text: "hello", + }, + }, + }, + } as any) + + controller.push({ + directory: cwd, + payload: { + type: "message.part.updated", + properties: { + part: { + id: "msg_1_part", + sessionID: sessionId, + messageID: "msg_1", + type: "text", + text: "hello world", + }, + }, + }, + } as any) + + await new Promise((r) => setTimeout(r, 20)) + + expect(chunks.get(sessionId)).toBe("hello world") + stop() + }, + }) + }) + test("does not create additional event subscriptions on repeated loadSession()", async () => { await using tmp = await tmpdir() await Instance.provide({