diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index e7532d20073..e64f75e6ba9 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -18,8 +18,19 @@ import { Question } from "@/question" export namespace SessionProcessor { const DOOM_LOOP_THRESHOLD = 3 + const STREAM_ABORT_MAX_RETRIES = 3 const log = Log.create({ service: "session.processor" }) + export class StreamAbortedError extends Error { + constructor( + public readonly reason: string, + public readonly partialContent: string, + ) { + super(`Stream aborted by plugin: ${reason}`) + this.name = "StreamAbortedError" + } + } + export type Info = Awaited> export type Result = Awaited> @@ -30,9 +41,11 @@ export namespace SessionProcessor { abort: AbortSignal }) { const toolcalls: Record = {} + const toolInputAccumulated: Record = {} let snapshot: string | undefined let blocked = false let attempt = 0 + let streamAbortRetries = 0 let needsCompaction = false const result = { @@ -42,9 +55,10 @@ export namespace SessionProcessor { partFromToolCall(toolCallID: string) { return toolcalls[toolCallID] }, - async process(streamInput: LLM.StreamInput) { + async process(initialStreamInput: LLM.StreamInput) { log.info("process") needsCompaction = false + let streamInput = initialStreamInput const shouldBreak = (await Config.get()).experimental?.continue_loop_on_deny !== true while (true) { try { @@ -90,6 +104,22 @@ export namespace SessionProcessor { field: "text", delta: value.text, }) + // Fire stream.delta hook for reasoning deltas + const reasoningDeltaResult = await Plugin.trigger( + "stream.delta", + { + sessionID: input.sessionID, + messageID: input.assistantMessage.id, + type: "reasoning-delta" as const, + delta: value.text, + accumulated: part.text, + }, + { abort: false, reason: "" }, + ) + if (reasoningDeltaResult.abort) { + log.info("stream aborted by hook (reasoning-delta)", { reason: reasoningDeltaResult.reason }) + throw new StreamAbortedError(reasoningDeltaResult.reason, part.text) + } } break @@ -125,8 +155,29 @@ export namespace SessionProcessor { toolcalls[value.id] = part as MessageV2.ToolPart break - case "tool-input-delta": + case "tool-input-delta": { + const id = (value as any).id as string + const delta = (value as any).delta as string + if (id && delta) { + toolInputAccumulated[id] = (toolInputAccumulated[id] ?? "") + delta + const toolDeltaResult = await Plugin.trigger( + "stream.delta", + { + sessionID: input.sessionID, + messageID: input.assistantMessage.id, + type: "tool-input-delta" as const, + delta, + accumulated: toolInputAccumulated[id], + }, + { abort: false, reason: "" }, + ) + if (toolDeltaResult.abort) { + log.info("stream aborted by hook (tool-input-delta)", { reason: toolDeltaResult.reason }) + throw new StreamAbortedError(toolDeltaResult.reason, toolInputAccumulated[id]) + } + } break + } case "tool-input-end": break @@ -310,6 +361,22 @@ export namespace SessionProcessor { field: "text", delta: value.text, }) + // Fire stream.delta hook for text deltas + const textDeltaResult = await Plugin.trigger( + "stream.delta", + { + sessionID: input.sessionID, + messageID: input.assistantMessage.id, + type: "text-delta" as const, + delta: value.text, + accumulated: currentText.text, + }, + { abort: false, reason: "" }, + ) + if (textDeltaResult.abort) { + log.info("stream aborted by hook (text-delta)", { reason: textDeltaResult.reason }) + throw new StreamAbortedError(textDeltaResult.reason, currentText.text) + } } break @@ -348,6 +415,47 @@ export namespace SessionProcessor { if (needsCompaction) break } } catch (e: any) { + // Handle stream abort from plugin hooks + if (e instanceof StreamAbortedError) { + log.info("handling stream abort", { reason: e.reason }) + const abortedResult = await Plugin.trigger( + "stream.aborted", + { + sessionID: input.sessionID, + messageID: input.assistantMessage.id, + reason: e.reason, + partialContent: e.partialContent, + }, + { retry: false, injectMessage: "" }, + ) + if (abortedResult.retry && streamAbortRetries < STREAM_ABORT_MAX_RETRIES) { + streamAbortRetries++ + log.info("retrying after stream abort", { + attempt: streamAbortRetries, + injectMessage: abortedResult.injectMessage ? "yes" : "no", + }) + if (abortedResult.injectMessage) { + streamInput = { + ...streamInput, + messages: [ + ...streamInput.messages, + { + role: "user" as const, + content: [{ type: "text" as const, text: abortedResult.injectMessage }], + }, + ], + } + } + continue + } + // Not retrying — fall through to normal error handling + log.info("stream abort not retried", { + retry: abortedResult.retry, + retries: streamAbortRetries, + maxRetries: STREAM_ABORT_MAX_RETRIES, + }) + } + log.error("process", { error: e, stack: JSON.stringify(e.stack), diff --git a/packages/plugin/src/index.ts b/packages/plugin/src/index.ts index 76370d1d5a7..973e8374928 100644 --- a/packages/plugin/src/index.ts +++ b/packages/plugin/src/index.ts @@ -231,4 +231,49 @@ export interface Hooks { * Modify tool definitions (description and parameters) sent to LLM */ "tool.definition"?: (input: { toolID: string }, output: { description: string; parameters: any }) => Promise + /** + * Called on every streaming delta event (text-delta, reasoning-delta, + * tool-input-delta). Allows plugins to observe the stream in real time + * and optionally request an abort. + * + * Setting `output.abort` to `true` will cancel the current stream. + * The `stream.aborted` hook will then be called with the abort reason. + * + * Use cases: TTSR (Time-To-Stream Rules), content filtering, real-time + * monitoring, pattern detection. + */ + "stream.delta"?: ( + input: { + sessionID: string + messageID: string + type: "text-delta" | "reasoning-delta" | "tool-input-delta" + delta: string + accumulated: string + }, + output: { + abort: boolean + reason: string + }, + ) => Promise + /** + * Called after a stream is aborted by a `stream.delta` hook. Allows + * plugins to decide whether to retry the request with optional + * corrective context injected into the conversation. + * + * Setting `output.retry` to `true` will discard the aborted response + * and start a new streaming request. If `output.injectMessage` is set, + * it will be added as a user message before the retry. + */ + "stream.aborted"?: ( + input: { + sessionID: string + messageID: string + reason: string + partialContent: string + }, + output: { + retry: boolean + injectMessage: string + }, + ) => Promise }