diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index edb093f1974..46a3a1508c2 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -35,6 +35,7 @@ export namespace Bus { for (const sub of [...wildcard]) { sub(event) } + entry.subscriptions.clear() }, ) @@ -102,4 +103,12 @@ export namespace Bus { match.splice(index, 1) } } + + export function debug() { + const counts: Record = {} + for (const [type, subs] of state().subscriptions) { + counts[type] = subs.length + } + return { subscriptions: counts } + } } diff --git a/packages/opencode/src/cli/cmd/acp.ts b/packages/opencode/src/cli/cmd/acp.ts index 99a9a81ab9c..f55974e6699 100644 --- a/packages/opencode/src/cli/cmd/acp.ts +++ b/packages/opencode/src/cli/cmd/acp.ts @@ -62,8 +62,8 @@ export const AcpCommand = cmd({ log.info("setup connection") process.stdin.resume() await new Promise((resolve, reject) => { - process.stdin.on("end", resolve) - process.stdin.on("error", reject) + process.stdin.once("end", resolve) + process.stdin.once("error", reject) }) }) }, diff --git a/packages/opencode/src/cli/cmd/github.ts b/packages/opencode/src/cli/cmd/github.ts index 672e73d49a9..970be68c451 100644 --- a/packages/opencode/src/cli/cmd/github.ts +++ b/packages/opencode/src/cli/cmd/github.ts @@ -28,6 +28,7 @@ import { Bus } from "../../bus" import { MessageV2 } from "../../session/message-v2" import { SessionPrompt } from "@/session/prompt" import { $ } from "bun" +import { setTimeout as sleep } from "node:timers/promises" type GitHubAuthor = { login: string @@ -353,7 +354,7 @@ export const GithubInstallCommand = cmd({ } retries++ - await Bun.sleep(1000) + await sleep(1000) } while (true) s.stop("Installed GitHub app") @@ -493,6 +494,7 @@ export const GithubRunCommand = cmd({ : "issue" : undefined + let unsubSessionEvents: (() => void) | undefined try { if (useGithubToken) { const githubToken = process.env["GITHUB_TOKEN"] @@ -532,7 +534,7 @@ export const GithubRunCommand = cmd({ }, ], }) - subscribeSessionEvents() + unsubSessionEvents = subscribeSessionEvents() shareId = await (async () => { if (share === false) return if (!share && repoData.data.private) return @@ -670,6 +672,7 @@ export const GithubRunCommand = cmd({ // Also output the clean error message for the action to capture //core.setOutput("prepare_error", e.message); } finally { + unsubSessionEvents?.() if (!useGithubToken) { await restoreGitConfig() await revokeAppToken() @@ -867,7 +870,7 @@ export const GithubRunCommand = cmd({ } let text = "" - Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { + const unsub = Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => { if (evt.properties.part.sessionID !== session.id) return //if (evt.properties.part.messageID === messageID) return const part = evt.properties.part @@ -894,6 +897,7 @@ export const GithubRunCommand = cmd({ } } }) + return unsub } async function summarize(response: string) { @@ -1372,7 +1376,7 @@ Co-authored-by: ${actor} <${actor}@users.noreply.github.com>"` } catch (e) { if (retries > 0) { console.log(`Retrying after ${delayMs}ms...`) - await Bun.sleep(delayMs) + await sleep(delayMs) return withRetry(fn, retries - 1, delayMs) } throw e diff --git a/packages/opencode/src/control-plane/workspace-server/routes.ts b/packages/opencode/src/control-plane/workspace-server/routes.ts index 353e5d50af0..3f3a46c671e 100644 --- a/packages/opencode/src/control-plane/workspace-server/routes.ts +++ b/packages/opencode/src/control-plane/workspace-server/routes.ts @@ -12,21 +12,48 @@ export function WorkspaceServerRoutes() { data: JSON.stringify(event), }) } + + let done = false + let resolveWait: () => void = () => {} + let heartbeat: ReturnType | undefined + + const cleanup = () => { + if (done) return + done = true + clearInterval(heartbeat) + GlobalBus.off("event", handler) + resolveWait() + } + const handler = async (event: { directory?: string; payload: unknown }) => { - await send(event.payload) + if (done) return + try { + await send(event.payload) + } catch { + cleanup() + } } + GlobalBus.on("event", handler) - await send({ type: "server.connected", properties: {} }) - const heartbeat = setInterval(() => { - void send({ type: "server.heartbeat", properties: {} }) + + try { + await send({ type: "server.connected", properties: {} }) + } catch { + cleanup() + } + + heartbeat = setInterval(async () => { + if (done) return + try { + await send({ type: "server.heartbeat", properties: {} }) + } catch { + cleanup() + } }, 10_000) await new Promise((resolve) => { - stream.onAbort(() => { - clearInterval(heartbeat) - GlobalBus.off("event", handler) - resolve() - }) + resolveWait = resolve + stream.onAbort(cleanup) }) }) }) diff --git a/packages/opencode/src/index.ts b/packages/opencode/src/index.ts index 35b42dce77c..3cd6a326b26 100644 --- a/packages/opencode/src/index.ts +++ b/packages/opencode/src/index.ts @@ -27,6 +27,7 @@ import { AcpCommand } from "./cli/cmd/acp" import { EOL } from "os" import { WebCommand } from "./cli/cmd/web" import { PrCommand } from "./cli/cmd/pr" +import { Instance } from "./project/instance" import { SessionCommand } from "./cli/cmd/session" import { DbCommand } from "./cli/cmd/db" import path from "path" @@ -203,6 +204,9 @@ try { } process.exitCode = 1 } finally { + // Clean up instances (PTY, MCP, LSP) before exiting. + // Timeout prevents hanging on unresponsive subprocesses. + await Promise.race([Instance.disposeAll(), new Promise((resolve) => setTimeout(resolve, 5000))]) // Some subprocesses don't react properly to SIGTERM and similar signals. // Most notably, some docker-container-based MCP servers don't handle such signals unless // run using `docker run --init`. diff --git a/packages/opencode/src/lsp/client.ts b/packages/opencode/src/lsp/client.ts index 084ccf831ee..c5af356cb9f 100644 --- a/packages/opencode/src/lsp/client.ts +++ b/packages/opencode/src/lsp/client.ts @@ -237,6 +237,8 @@ export namespace LSPClient { }, async shutdown() { l.info("shutting down") + diagnostics.clear() + for (const key of Object.keys(files)) delete files[key] connection.end() connection.dispose() input.server.process.kill() diff --git a/packages/opencode/src/project/instance.ts b/packages/opencode/src/project/instance.ts index 98031f18d3f..9fae62a30b2 100644 --- a/packages/opencode/src/project/instance.ts +++ b/packages/opencode/src/project/instance.ts @@ -66,6 +66,13 @@ export const Instance = { state(init: () => S, dispose?: (state: Awaited) => Promise): () => S { return State.create(() => Instance.directory, init, dispose) }, + debug() { + return { + cacheSize: cache.size, + cacheKeys: [...cache.keys()], + state: State.debug(), + } + }, async dispose() { Log.Default.info("disposing instance", { directory: Instance.directory }) await State.dispose(Instance.directory) diff --git a/packages/opencode/src/project/state.ts b/packages/opencode/src/project/state.ts index a9dce565b5e..7749bca5743 100644 --- a/packages/opencode/src/project/state.ts +++ b/packages/opencode/src/project/state.ts @@ -67,4 +67,12 @@ export namespace State { disposalFinished = true log.info("state disposal completed", { key }) } + + export function debug() { + const result: Record = {} + for (const [key, entries] of recordsByKey) { + result[key] = entries.size + } + return { keys: recordsByKey.size, entries: result } + } } diff --git a/packages/opencode/src/server/routes/global.ts b/packages/opencode/src/server/routes/global.ts index 4d019f6a7ee..2fe6d7b92ae 100644 --- a/packages/opencode/src/server/routes/global.ts +++ b/packages/opencode/src/server/routes/global.ts @@ -69,7 +69,7 @@ export const GlobalRoutes = lazy(() => c.header("X-Accel-Buffering", "no") c.header("X-Content-Type-Options", "nosniff") return streamSSE(c, async (stream) => { - stream.writeSSE({ + await stream.writeSSE({ data: JSON.stringify({ payload: { type: "server.connected", @@ -77,32 +77,51 @@ export const GlobalRoutes = lazy(() => }, }), }) + + let done = false + let resolveWait: () => void = () => {} + let heartbeat: ReturnType | undefined + + const cleanup = () => { + if (done) return + done = true + clearInterval(heartbeat) + GlobalBus.off("event", handler) + resolveWait() + log.info("global event disconnected") + } + async function handler(event: any) { - await stream.writeSSE({ - data: JSON.stringify(event), - }) + if (done) return + try { + await stream.writeSSE({ data: JSON.stringify(event) }) + } catch { + cleanup() + } } + GlobalBus.on("event", handler) // Send heartbeat every 10s to prevent stalled proxy streams. - const heartbeat = setInterval(() => { - stream.writeSSE({ - data: JSON.stringify({ - payload: { - type: "server.heartbeat", - properties: {}, - }, - }), - }) + heartbeat = setInterval(async () => { + if (done) return + try { + await stream.writeSSE({ + data: JSON.stringify({ + payload: { + type: "server.heartbeat", + properties: {}, + }, + }), + }) + } catch { + cleanup() + } }, 10_000) await new Promise((resolve) => { - stream.onAbort(() => { - clearInterval(heartbeat) - GlobalBus.off("event", handler) - resolve() - log.info("global event disconnected") - }) + resolveWait = resolve + stream.onAbort(cleanup) }) }) }, diff --git a/packages/opencode/src/server/server.ts b/packages/opencode/src/server/server.ts index 85049650c1f..1cd471f6865 100644 --- a/packages/opencode/src/server/server.ts +++ b/packages/opencode/src/server/server.ts @@ -1,5 +1,6 @@ import { BusEvent } from "@/bus/bus-event" import { Bus } from "@/bus" +import { GlobalBus } from "@/bus/global" import { Log } from "../util/log" import { describeRoute, generateSpecs, validator, resolver, openAPIRouteHandler } from "hono-openapi" import { Hono } from "hono" @@ -41,6 +42,9 @@ import { QuestionRoutes } from "./routes/question" import { PermissionRoutes } from "./routes/permission" import { GlobalRoutes } from "./routes/global" import { MDNS } from "./mdns" +import { Diagnostics } from "../util/diagnostics" +import { SessionStatus } from "../session/status" +import { Pty } from "../pty" // @ts-ignore This global is needed to prevent ai-sdk from logging warnings to stdout https://github.com/vercel/ai/blob/2dc67e0ef538307f21368db32d5a12345d98831b/packages/ai/src/logger/log-warnings.ts#L85 globalThis.AI_SDK_LOG_WARNINGS = false @@ -520,42 +524,85 @@ export namespace Server { c.header("X-Accel-Buffering", "no") c.header("X-Content-Type-Options", "nosniff") return streamSSE(c, async (stream) => { - stream.writeSSE({ + await stream.writeSSE({ data: JSON.stringify({ type: "server.connected", properties: {}, }), }) - const unsub = Bus.subscribeAll(async (event) => { - await stream.writeSSE({ - data: JSON.stringify(event), - }) - if (event.type === Bus.InstanceDisposed.type) { - stream.close() + + let done = false + let unsub: () => void = () => {} + let resolveWait: () => void = () => {} + let heartbeat: ReturnType | undefined + + const cleanup = () => { + if (done) return + done = true + clearInterval(heartbeat) + unsub() + resolveWait() + log.info("event disconnected") + } + + unsub = Bus.subscribeAll(async (event) => { + if (done) return + try { + await stream.writeSSE({ data: JSON.stringify(event) }) + if (event.type === Bus.InstanceDisposed.type) { + cleanup() + stream.close() + } + } catch { + cleanup() } }) // Send heartbeat every 10s to prevent stalled proxy streams. - const heartbeat = setInterval(() => { - stream.writeSSE({ - data: JSON.stringify({ - type: "server.heartbeat", - properties: {}, - }), - }) + heartbeat = setInterval(async () => { + if (done) return + try { + await stream.writeSSE({ + data: JSON.stringify({ + type: "server.heartbeat", + properties: {}, + }), + }) + } catch { + cleanup() + } }, 10_000) await new Promise((resolve) => { - stream.onAbort(() => { - clearInterval(heartbeat) - unsub() - resolve() - log.info("event disconnected") - }) + resolveWait = resolve + stream.onAbort(cleanup) }) }) }, ) + .get("/debug/memory", async (c) => { + const data = await Diagnostics.report() + return c.json({ + ...data, + collections: { + globalBus: { + eventListeners: GlobalBus.listenerCount("event"), + }, + bus: Bus.debug(), + instance: Instance.debug(), + sessionStatus: Object.keys(SessionStatus.list()).length, + pty: Pty.list().length, + }, + }) + }) + .post("/debug/snapshot", async (c) => { + const snapshotPath = await Diagnostics.snapshot() + return c.json({ path: snapshotPath }) + }) + .post("/debug/dump", async (c) => { + const result = await Diagnostics.dump() + return c.json(result) + }) .all("/*", async (c) => { const path = c.req.path @@ -597,6 +644,7 @@ export namespace Server { cors?: string[] }) { _corsWhitelist = opts.cors ?? [] + Diagnostics.init() const args = { hostname: opts.hostname, diff --git a/packages/opencode/src/util/diagnostics.ts b/packages/opencode/src/util/diagnostics.ts new file mode 100644 index 00000000000..3f569c031e2 --- /dev/null +++ b/packages/opencode/src/util/diagnostics.ts @@ -0,0 +1,221 @@ +import path from "path" +import os from "os" +import fs from "fs/promises" +import { Log } from "./log" +import { Global } from "../global" +import { GlobalBus } from "../bus/global" +import { Bus } from "../bus" +import { Instance } from "../project/instance" +import { SessionStatus } from "../session/status" +import { Pty } from "../pty" + +export namespace Diagnostics { + const log = Log.create({ service: "diagnostics" }) + const dir = path.join(Global.Path.data, "diagnostics") + + const WARN_GB = 2 + const INTERVAL_MS = 30_000 + let timer: ReturnType | undefined + let killing = false + let lastSample = 0 + const SAMPLE_COOLDOWN_MS = 60_000 + + function threshold() { + const env = process.env.OPENCODE_MEMORY_LIMIT + if (env) return parseFloat(env) * 1024 ** 3 + return Math.max(2 * 1024 ** 3, Math.min(0.25 * os.totalmem(), 4 * 1024 ** 3)) + } + + function rss() { + return process.memoryUsage.rss() + } + + function gb(bytes: number) { + return (bytes / 1024 ** 3).toFixed(2) + } + + function mb(bytes: number) { + return (bytes / 1024 ** 2).toFixed(1) + } + + async function heap() { + const jsc = await import("bun:jsc") + return jsc.heapStats() + } + + function topTypes(counts: Record, n = 15) { + return Object.entries(counts) + .sort(([, a], [, b]) => b - a) + .slice(0, n) + .map(([type, count]) => ({ type, count })) + } + + export async function report() { + const mem = process.memoryUsage() + const stats = await heap() + return { + ts: Date.now(), + pid: process.pid, + rss: { bytes: mem.rss, mb: mb(mem.rss), gb: gb(mem.rss) }, + heap: { + used: { bytes: mem.heapUsed, mb: mb(mem.heapUsed) }, + total: { bytes: mem.heapTotal, mb: mb(mem.heapTotal) }, + external: { bytes: mem.external, mb: mb(mem.external) }, + }, + jsc: { + heapSize: stats.heapSize, + heapCapacity: stats.heapCapacity, + extraMemorySize: stats.extraMemorySize, + objectCount: stats.objectCount, + protectedObjectCount: stats.protectedObjectCount, + globalObjectCount: stats.globalObjectCount, + protectedGlobalObjectCount: stats.protectedGlobalObjectCount, + topObjectTypes: topTypes(stats.objectTypeCounts), + topProtectedTypes: topTypes(stats.protectedObjectTypeCounts), + }, + } + } + + function collections() { + return { + globalBus: { eventListeners: GlobalBus.listenerCount("event") }, + bus: Bus.debug(), + instance: Instance.debug(), + sessionStatus: Object.keys(SessionStatus.list()).length, + pty: Pty.list().length, + } + } + + export async function snapshot() { + await fs.mkdir(dir, { recursive: true }) + const jsc = await import("bun:jsc") + jsc.fullGC() + jsc.releaseWeakRefs() + const name = `heap-${Date.now()}.heapsnapshot` + const dest = path.join(dir, name) + const data = Bun.generateHeapSnapshot("v8") + await Bun.write(dest, data) + log.info("heap snapshot written", { path: dest }) + return dest + } + + export async function dump() { + await fs.mkdir(dir, { recursive: true }) + const data = { ...(await report()), collections: collections() } + const name = `report-${Date.now()}.json` + const dest = path.join(dir, name) + await Bun.write(dest, JSON.stringify(data, null, 2)) + log.info("diagnostic report written", { path: dest }) + return { report: data, path: dest } + } + + async function kill(bytes: number, limit: number) { + killing = true + stop() + log.error("memory critical — initiating emergency shutdown", { + rss_gb: gb(bytes), + kill_threshold_gb: gb(limit), + pid: process.pid, + }) + GlobalBus.emit("event", { + payload: { + type: "tui.toast.show", + properties: { + title: "Memory Critical", + message: `RSS at ${gb(bytes)}GB. Dumping diagnostics to ${dir} and exiting.`, + variant: "error" as const, + duration: 10000, + }, + }, + }) + const dumpResult = await dump().catch((err) => { + log.error("failed to write diagnostic report", { error: String(err) }) + return undefined + }) + const snapshotPath = await snapshot().catch((err) => { + log.error("failed to write heap snapshot", { error: String(err) }) + return undefined + }) + log.error("pre-shutdown diagnostics", { + report: dumpResult?.path, + snapshot: snapshotPath, + }) + await Bun.sleep(2000) + await Instance.disposeAll().catch(() => {}) + process.exit(1) + } + + async function check() { + if (killing) return + Bun.gc(true) + const bytes = rss() + const gigs = bytes / 1024 ** 3 + if (gigs < WARN_GB) return + const limit = threshold() + if (bytes >= limit) return kill(bytes, limit) + log.warn("high memory usage detected", { + rss_gb: gb(bytes), + kill_threshold_gb: gb(limit), + pid: process.pid, + }) + const stats = await heap() + log.warn("heap stats", { + objectCount: stats.objectCount, + protectedObjectCount: stats.protectedObjectCount, + heapSize: mb(stats.heapSize), + topTypes: topTypes(stats.objectTypeCounts, 5), + }) + const now = Date.now() + if (now - lastSample >= SAMPLE_COOLDOWN_MS) { + lastSample = now + await dump().catch((err) => { + log.warn("failed to write diagnostic sample", { error: String(err) }) + }) + } + } + + export function init() { + process.on("SIGUSR1", async () => { + log.info("SIGUSR1 received — capturing diagnostics") + const { report: data, path: reportPath } = await dump() + log.info("report summary", { + rss_gb: data.rss.gb, + objectCount: data.jsc.objectCount, + path: reportPath, + }) + const snapshotPath = await snapshot() + log.info("diagnostic capture complete", { + report: reportPath, + snapshot: snapshotPath, + }) + }) + + if (!process.env.OPENCODE_DIAGNOSTICS) { + log.info("diagnostics available", { + signal: "SIGUSR1", + enable: "OPENCODE_DIAGNOSTICS=1", + output: dir, + }) + return + } + + timer = setInterval(check, INTERVAL_MS) + timer.unref() + + const limit = threshold() + log.info("diagnostics monitoring active", { + signal: "SIGUSR1", + warn_gb: WARN_GB, + kill_threshold_gb: gb(limit), + interval_ms: INTERVAL_MS, + output: dir, + }) + } + + export function stop() { + if (timer) { + clearInterval(timer) + timer = undefined + } + } +}