diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index edb093f1974..625f2966227 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -51,8 +51,8 @@ export namespace Bus { }) const pending = [] for (const key of [def.type, "*"]) { - const match = state().subscriptions.get(key) - for (const sub of match ?? []) { + const match = [...(state().subscriptions.get(key) ?? [])] + for (const sub of match) { pending.push(sub(payload)) } } diff --git a/packages/opencode/src/server/routes/event.ts b/packages/opencode/src/server/routes/event.ts new file mode 100644 index 00000000000..f34ff056679 --- /dev/null +++ b/packages/opencode/src/server/routes/event.ts @@ -0,0 +1,85 @@ +import { Hono } from "hono" +import { describeRoute, resolver } from "hono-openapi" +import { streamSSE } from "hono/streaming" +import { Log } from "@/util/log" +import { BusEvent } from "@/bus/bus-event" +import { Bus } from "@/bus" +import { lazy } from "../../util/lazy" +import { AsyncQueue } from "../../util/queue" +import { Instance } from "@/project/instance" + +const log = Log.create({ service: "server" }) + +export const EventRoutes = lazy(() => + new Hono().get( + "/event", + describeRoute({ + summary: "Subscribe to events", + description: "Get events", + operationId: "event.subscribe", + responses: { + 200: { + description: "Event stream", + content: { + "text/event-stream": { + schema: resolver(BusEvent.payloads()), + }, + }, + }, + }, + }), + async (c) => { + log.info("event connected") + c.header("X-Accel-Buffering", "no") + c.header("X-Content-Type-Options", "nosniff") + return streamSSE(c, async (stream) => { + const q = new AsyncQueue() + let done = false + + q.push( + JSON.stringify({ + type: "server.connected", + properties: {}, + }), + ) + + // Send heartbeat every 10s to prevent stalled proxy streams. + const heartbeat = setInterval(() => { + q.push( + JSON.stringify({ + type: "server.heartbeat", + properties: {}, + }), + ) + }, 10_000) + + const unsub = Bus.subscribeAll((event) => { + q.push(JSON.stringify(event)) + if (event.type === Bus.InstanceDisposed.type) { + stop() + } + }) + + const stop = () => { + if (done) return + done = true + clearInterval(heartbeat) + unsub() + q.push(null) + log.info("event disconnected") + } + + stream.onAbort(stop) + + try { + for await (const data of q) { + if (data === null) return + await stream.writeSSE({ data }) + } + } finally { + stop() + } + }) + }, + ), +) diff --git a/packages/opencode/src/server/routes/global.ts b/packages/opencode/src/server/routes/global.ts index 4d019f6a7ee..4a6a3ebc7e2 100644 --- a/packages/opencode/src/server/routes/global.ts +++ b/packages/opencode/src/server/routes/global.ts @@ -4,6 +4,7 @@ import { streamSSE } from "hono/streaming" import z from "zod" import { BusEvent } from "@/bus/bus-event" import { GlobalBus } from "@/bus/global" +import { AsyncQueue } from "@/util/queue" import { Instance } from "../../project/instance" import { Installation } from "@/installation" import { Log } from "../../util/log" @@ -69,41 +70,54 @@ export const GlobalRoutes = lazy(() => c.header("X-Accel-Buffering", "no") c.header("X-Content-Type-Options", "nosniff") return streamSSE(c, async (stream) => { - stream.writeSSE({ - data: JSON.stringify({ + const q = new AsyncQueue() + let done = false + + q.push( + JSON.stringify({ payload: { type: "server.connected", properties: {}, }, }), - }) - async function handler(event: any) { - await stream.writeSSE({ - data: JSON.stringify(event), - }) - } - GlobalBus.on("event", handler) + ) // Send heartbeat every 10s to prevent stalled proxy streams. const heartbeat = setInterval(() => { - stream.writeSSE({ - data: JSON.stringify({ + q.push( + JSON.stringify({ payload: { type: "server.heartbeat", properties: {}, }, }), - }) + ) }, 10_000) - await new Promise((resolve) => { - stream.onAbort(() => { - clearInterval(heartbeat) - GlobalBus.off("event", handler) - resolve() - log.info("global event disconnected") - }) - }) + async function handler(event: any) { + q.push(JSON.stringify(event)) + } + GlobalBus.on("event", handler) + + const stop = () => { + if (done) return + done = true + clearInterval(heartbeat) + GlobalBus.off("event", handler) + q.push(null) + log.info("event disconnected") + } + + stream.onAbort(stop) + + try { + for await (const data of q) { + if (data === null) return + await stream.writeSSE({ data }) + } + } finally { + stop() + } }) }, ) diff --git a/packages/opencode/src/server/server.ts b/packages/opencode/src/server/server.ts index c485654fdf8..a68becb1fba 100644 --- a/packages/opencode/src/server/server.ts +++ b/packages/opencode/src/server/server.ts @@ -1,10 +1,7 @@ -import { BusEvent } from "@/bus/bus-event" -import { Bus } from "@/bus" import { Log } from "../util/log" import { describeRoute, generateSpecs, validator, resolver, openAPIRouteHandler } from "hono-openapi" import { Hono } from "hono" import { cors } from "hono/cors" -import { streamSSE } from "hono/streaming" import { proxy } from "hono/proxy" import { basicAuth } from "hono/basic-auth" import z from "zod" @@ -34,6 +31,7 @@ import { FileRoutes } from "./routes/file" import { ConfigRoutes } from "./routes/config" import { ExperimentalRoutes } from "./routes/experimental" import { ProviderRoutes } from "./routes/provider" +import { EventRoutes } from "./routes/event" import { InstanceBootstrap } from "../project/bootstrap" import { NotFoundError } from "../storage/db" import type { ContentfulStatusCode } from "hono/utils/http-status" @@ -251,6 +249,7 @@ export namespace Server { .route("/question", QuestionRoutes()) .route("/provider", ProviderRoutes()) .route("/", FileRoutes()) + .route("/", EventRoutes()) .route("/mcp", McpRoutes()) .route("/tui", TuiRoutes()) .post( @@ -498,64 +497,6 @@ export namespace Server { return c.json(await Format.status()) }, ) - .get( - "/event", - describeRoute({ - summary: "Subscribe to events", - description: "Get events", - operationId: "event.subscribe", - responses: { - 200: { - description: "Event stream", - content: { - "text/event-stream": { - schema: resolver(BusEvent.payloads()), - }, - }, - }, - }, - }), - async (c) => { - log.info("event connected") - c.header("X-Accel-Buffering", "no") - c.header("X-Content-Type-Options", "nosniff") - return streamSSE(c, async (stream) => { - stream.writeSSE({ - data: JSON.stringify({ - type: "server.connected", - properties: {}, - }), - }) - const unsub = Bus.subscribeAll(async (event) => { - await stream.writeSSE({ - data: JSON.stringify(event), - }) - if (event.type === Bus.InstanceDisposed.type) { - stream.close() - } - }) - - // Send heartbeat every 10s to prevent stalled proxy streams. - const heartbeat = setInterval(() => { - stream.writeSSE({ - data: JSON.stringify({ - type: "server.heartbeat", - properties: {}, - }), - }) - }, 10_000) - - await new Promise((resolve) => { - stream.onAbort(() => { - clearInterval(heartbeat) - unsub() - resolve() - log.info("event disconnected") - }) - }) - }) - }, - ) .all("/*", async (c) => { const path = c.req.path