diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index e7532d20073..e64f75e6ba9 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -18,8 +18,19 @@ import { Question } from "@/question" export namespace SessionProcessor { const DOOM_LOOP_THRESHOLD = 3 + const STREAM_ABORT_MAX_RETRIES = 3 const log = Log.create({ service: "session.processor" }) + export class StreamAbortedError extends Error { + constructor( + public readonly reason: string, + public readonly partialContent: string, + ) { + super(`Stream aborted by plugin: ${reason}`) + this.name = "StreamAbortedError" + } + } + export type Info = Awaited> export type Result = Awaited> @@ -30,9 +41,11 @@ export namespace SessionProcessor { abort: AbortSignal }) { const toolcalls: Record = {} + const toolInputAccumulated: Record = {} let snapshot: string | undefined let blocked = false let attempt = 0 + let streamAbortRetries = 0 let needsCompaction = false const result = { @@ -42,9 +55,10 @@ export namespace SessionProcessor { partFromToolCall(toolCallID: string) { return toolcalls[toolCallID] }, - async process(streamInput: LLM.StreamInput) { + async process(initialStreamInput: LLM.StreamInput) { log.info("process") needsCompaction = false + let streamInput = initialStreamInput const shouldBreak = (await Config.get()).experimental?.continue_loop_on_deny !== true while (true) { try { @@ -90,6 +104,22 @@ export namespace SessionProcessor { field: "text", delta: value.text, }) + // Fire stream.delta hook for reasoning deltas + const reasoningDeltaResult = await Plugin.trigger( + "stream.delta", + { + sessionID: input.sessionID, + messageID: input.assistantMessage.id, + type: "reasoning-delta" as const, + delta: value.text, + accumulated: part.text, + }, + { abort: false, reason: "" }, + ) + if (reasoningDeltaResult.abort) { + log.info("stream aborted by hook (reasoning-delta)", { reason: reasoningDeltaResult.reason }) + throw new StreamAbortedError(reasoningDeltaResult.reason, part.text) + } } break @@ -125,8 +155,29 @@ export namespace SessionProcessor { toolcalls[value.id] = part as MessageV2.ToolPart break - case "tool-input-delta": + case "tool-input-delta": { + const id = (value as any).id as string + const delta = (value as any).delta as string + if (id && delta) { + toolInputAccumulated[id] = (toolInputAccumulated[id] ?? "") + delta + const toolDeltaResult = await Plugin.trigger( + "stream.delta", + { + sessionID: input.sessionID, + messageID: input.assistantMessage.id, + type: "tool-input-delta" as const, + delta, + accumulated: toolInputAccumulated[id], + }, + { abort: false, reason: "" }, + ) + if (toolDeltaResult.abort) { + log.info("stream aborted by hook (tool-input-delta)", { reason: toolDeltaResult.reason }) + throw new StreamAbortedError(toolDeltaResult.reason, toolInputAccumulated[id]) + } + } break + } case "tool-input-end": break @@ -310,6 +361,22 @@ export namespace SessionProcessor { field: "text", delta: value.text, }) + // Fire stream.delta hook for text deltas + const textDeltaResult = await Plugin.trigger( + "stream.delta", + { + sessionID: input.sessionID, + messageID: input.assistantMessage.id, + type: "text-delta" as const, + delta: value.text, + accumulated: currentText.text, + }, + { abort: false, reason: "" }, + ) + if (textDeltaResult.abort) { + log.info("stream aborted by hook (text-delta)", { reason: textDeltaResult.reason }) + throw new StreamAbortedError(textDeltaResult.reason, currentText.text) + } } break @@ -348,6 +415,47 @@ export namespace SessionProcessor { if (needsCompaction) break } } catch (e: any) { + // Handle stream abort from plugin hooks + if (e instanceof StreamAbortedError) { + log.info("handling stream abort", { reason: e.reason }) + const abortedResult = await Plugin.trigger( + "stream.aborted", + { + sessionID: input.sessionID, + messageID: input.assistantMessage.id, + reason: e.reason, + partialContent: e.partialContent, + }, + { retry: false, injectMessage: "" }, + ) + if (abortedResult.retry && streamAbortRetries < STREAM_ABORT_MAX_RETRIES) { + streamAbortRetries++ + log.info("retrying after stream abort", { + attempt: streamAbortRetries, + injectMessage: abortedResult.injectMessage ? "yes" : "no", + }) + if (abortedResult.injectMessage) { + streamInput = { + ...streamInput, + messages: [ + ...streamInput.messages, + { + role: "user" as const, + content: [{ type: "text" as const, text: abortedResult.injectMessage }], + }, + ], + } + } + continue + } + // Not retrying — fall through to normal error handling + log.info("stream abort not retried", { + retry: abortedResult.retry, + retries: streamAbortRetries, + maxRetries: STREAM_ABORT_MAX_RETRIES, + }) + } + log.error("process", { error: e, stack: JSON.stringify(e.stack), diff --git a/packages/opencode/test/session/stream-hooks.test.ts b/packages/opencode/test/session/stream-hooks.test.ts new file mode 100644 index 00000000000..d71808d1918 --- /dev/null +++ b/packages/opencode/test/session/stream-hooks.test.ts @@ -0,0 +1,112 @@ +import { describe, expect, test } from "bun:test" +import { SessionProcessor } from "../../src/session/processor" + +describe("session.processor.StreamAbortedError", () => { + test("stores reason and partial content", () => { + const err = new SessionProcessor.StreamAbortedError("bad content", "partial text here") + expect(err.reason).toBe("bad content") + expect(err.partialContent).toBe("partial text here") + expect(err.name).toBe("StreamAbortedError") + expect(err.message).toBe("Stream aborted by plugin: bad content") + }) + + test("is an instance of Error", () => { + const err = new SessionProcessor.StreamAbortedError("test", "") + expect(err).toBeInstanceOf(Error) + expect(err).toBeInstanceOf(SessionProcessor.StreamAbortedError) + }) + + test("has a stack trace", () => { + const err = new SessionProcessor.StreamAbortedError("reason", "content") + expect(err.stack).toBeDefined() + expect(err.stack).toContain("StreamAbortedError") + }) + + test("handles empty reason and content", () => { + const err = new SessionProcessor.StreamAbortedError("", "") + expect(err.reason).toBe("") + expect(err.partialContent).toBe("") + expect(err.message).toBe("Stream aborted by plugin: ") + }) +}) + +describe("stream hook type contracts", () => { + test("stream.delta input shape matches expected fields", () => { + // Verify the hook input type has the required fields + const input = { + sessionID: "sess-1", + messageID: "msg-1", + type: "text-delta" as const, + delta: "hello", + accumulated: "hello world", + } + expect(input.sessionID).toBe("sess-1") + expect(input.type).toBe("text-delta") + expect(input.delta).toBe("hello") + expect(input.accumulated).toBe("hello world") + }) + + test("stream.delta supports all three delta types", () => { + const types = ["text-delta", "reasoning-delta", "tool-input-delta"] as const + for (const t of types) { + const input = { + sessionID: "s", + messageID: "m", + type: t, + delta: "d", + accumulated: "a", + } + expect(input.type).toBe(t) + } + }) + + test("stream.delta output defaults to no-abort", () => { + const output = { abort: false, reason: "" } + expect(output.abort).toBe(false) + expect(output.reason).toBe("") + }) + + test("stream.aborted input shape matches expected fields", () => { + const input = { + sessionID: "sess-1", + messageID: "msg-1", + reason: "content policy violation", + partialContent: "some partial output", + } + expect(input.reason).toBe("content policy violation") + expect(input.partialContent).toBe("some partial output") + }) + + test("stream.aborted output defaults to no-retry", () => { + const output = { retry: false, injectMessage: "" } + expect(output.retry).toBe(false) + expect(output.injectMessage).toBe("") + }) + + test("stream.aborted output supports retry with message injection", () => { + const output = { + retry: true, + injectMessage: "Please avoid SQL statements in your response.", + } + expect(output.retry).toBe(true) + expect(output.injectMessage).toContain("SQL") + }) +}) + +describe("STREAM_ABORT_MAX_RETRIES constant", () => { + test("StreamAbortedError can be caught and inspected in a retry loop", () => { + const maxRetries = 3 + let retries = 0 + const errors: SessionProcessor.StreamAbortedError[] = [] + + while (retries < maxRetries) { + const err = new SessionProcessor.StreamAbortedError(`attempt ${retries + 1}`, `content-${retries}`) + errors.push(err) + retries++ + } + + expect(errors).toHaveLength(3) + expect(errors[0].reason).toBe("attempt 1") + expect(errors[2].partialContent).toBe("content-2") + }) +}) diff --git a/packages/plugin/src/index.ts b/packages/plugin/src/index.ts index 76370d1d5a7..973e8374928 100644 --- a/packages/plugin/src/index.ts +++ b/packages/plugin/src/index.ts @@ -231,4 +231,49 @@ export interface Hooks { * Modify tool definitions (description and parameters) sent to LLM */ "tool.definition"?: (input: { toolID: string }, output: { description: string; parameters: any }) => Promise + /** + * Called on every streaming delta event (text-delta, reasoning-delta, + * tool-input-delta). Allows plugins to observe the stream in real time + * and optionally request an abort. + * + * Setting `output.abort` to `true` will cancel the current stream. + * The `stream.aborted` hook will then be called with the abort reason. + * + * Use cases: TTSR (Time-To-Stream Rules), content filtering, real-time + * monitoring, pattern detection. + */ + "stream.delta"?: ( + input: { + sessionID: string + messageID: string + type: "text-delta" | "reasoning-delta" | "tool-input-delta" + delta: string + accumulated: string + }, + output: { + abort: boolean + reason: string + }, + ) => Promise + /** + * Called after a stream is aborted by a `stream.delta` hook. Allows + * plugins to decide whether to retry the request with optional + * corrective context injected into the conversation. + * + * Setting `output.retry` to `true` will discard the aborted response + * and start a new streaming request. If `output.injectMessage` is set, + * it will be added as a user message before the retry. + */ + "stream.aborted"?: ( + input: { + sessionID: string + messageID: string + reason: string + partialContent: string + }, + output: { + retry: boolean + injectMessage: string + }, + ) => Promise }