Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions packages/opencode/src/acp/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ export namespace ACP {
private eventStarted = false
private bashSnapshots = new Map<string, string>()
private toolStarts = new Set<string>()
private textSnapshots = new Map<string, string>()
private permissionQueues = new Map<string, Promise<void>>()
private permissionOptions: PermissionOption[] = [
{ optionId: "once", kind: "allow_once", name: "Allow once" },
Expand Down Expand Up @@ -446,6 +447,46 @@ export namespace ACP {
return
}
}

if (part.type === "text" && part.ignored !== true) {
const delta = this.textDelta(part.id, part.text)
if (!delta) return
await this.connection
.sessionUpdate({
sessionId,
update: {
sessionUpdate: "agent_message_chunk",
content: {
type: "text",
text: delta,
},
},
})
.catch((error) => {
log.error("failed to send text update to ACP", { error })
})
return
}

if (part.type === "reasoning") {
const delta = this.textDelta(part.id, part.text)
if (!delta) return
await this.connection
.sessionUpdate({
sessionId,
update: {
sessionUpdate: "agent_thought_chunk",
content: {
type: "text",
text: delta,
},
},
})
.catch((error) => {
log.error("failed to send reasoning update to ACP", { error })
})
return
}
return
}

Expand Down Expand Up @@ -476,6 +517,7 @@ export namespace ACP {
if (!part) return

if (part.type === "text" && props.field === "text" && part.ignored !== true) {
this.textSnapshots.set(part.id, part.text)
await this.connection
.sessionUpdate({
sessionId,
Expand All @@ -494,6 +536,7 @@ export namespace ACP {
}

if (part.type === "reasoning" && props.field === "text") {
this.textSnapshots.set(part.id, part.text)
await this.connection
.sessionUpdate({
sessionId,
Expand Down Expand Up @@ -1085,6 +1128,14 @@ export namespace ACP {
return output
}

private textDelta(id: string, next: string) {
const prev = this.textSnapshots.get(id) ?? ""
this.textSnapshots.set(id, next)
if (!next) return ""
if (next.startsWith(prev)) return next.slice(prev.length)
return next
}

private async toolStart(sessionId: string, part: ToolPart) {
if (this.toolStarts.has(part.callID)) return
this.toolStarts.add(part.callID)
Expand Down
115 changes: 115 additions & 0 deletions packages/opencode/test/acp/event-subscription.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,121 @@ describe("acp.agent event subscription", () => {
})
})


test("emits text chunks from message.part.updated when delta events are absent", async () => {
await using tmp = await tmpdir()
await Instance.provide({
directory: tmp.path,
fn: async () => {
const { agent, controller, chunks, stop } = createFakeAgent()
const cwd = "/tmp/opencode-acp-test"

const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)

controller.push({
directory: cwd,
payload: {
type: "message.part.updated",
properties: {
part: {
id: "part_1",
sessionID: sessionId,
messageID: "msg_1",
type: "text",
text: "hello",
},
},
},
} as any)

controller.push({
directory: cwd,
payload: {
type: "message.part.updated",
properties: {
part: {
id: "part_1",
sessionID: sessionId,
messageID: "msg_1",
type: "text",
text: "hello world",
},
},
},
} as any)

await new Promise((r) => setTimeout(r, 20))

expect(chunks.get(sessionId)).toBe("hello world")
stop()
},
})
})

test("does not duplicate chunks when message.part.delta is followed by message.part.updated", async () => {
await using tmp = await tmpdir()
await Instance.provide({
directory: tmp.path,
fn: async () => {
const { agent, controller, chunks, stop } = createFakeAgent()
const cwd = "/tmp/opencode-acp-test"

const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId)

controller.push({
directory: cwd,
payload: {
type: "message.part.delta",
properties: {
sessionID: sessionId,
messageID: "msg_1",
partID: "msg_1_part",
field: "text",
delta: "hello",
},
},
} as any)

controller.push({
directory: cwd,
payload: {
type: "message.part.updated",
properties: {
part: {
id: "msg_1_part",
sessionID: sessionId,
messageID: "msg_1",
type: "text",
text: "hello",
},
},
},
} as any)

controller.push({
directory: cwd,
payload: {
type: "message.part.updated",
properties: {
part: {
id: "msg_1_part",
sessionID: sessionId,
messageID: "msg_1",
type: "text",
text: "hello world",
},
},
},
} as any)

await new Promise((r) => setTimeout(r, 20))

expect(chunks.get(sessionId)).toBe("hello world")
stop()
},
})
})

test("does not create additional event subscriptions on repeated loadSession()", async () => {
await using tmp = await tmpdir()
await Instance.provide({
Expand Down