Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions packages/opencode/src/bus/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export namespace Bus {
for (const sub of [...wildcard]) {
sub(event)
}
entry.subscriptions.clear()
},
)

Expand Down Expand Up @@ -102,4 +103,12 @@ export namespace Bus {
match.splice(index, 1)
}
}

export function debug() {
const counts: Record<string, number> = {}
for (const [type, subs] of state().subscriptions) {
counts[type] = subs.length
}
return { subscriptions: counts }
}
}
4 changes: 2 additions & 2 deletions packages/opencode/src/cli/cmd/acp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ export const AcpCommand = cmd({
log.info("setup connection")
process.stdin.resume()
await new Promise((resolve, reject) => {
process.stdin.on("end", resolve)
process.stdin.on("error", reject)
process.stdin.once("end", resolve)
process.stdin.once("error", reject)
})
})
},
Expand Down
12 changes: 8 additions & 4 deletions packages/opencode/src/cli/cmd/github.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { Bus } from "../../bus"
import { MessageV2 } from "../../session/message-v2"
import { SessionPrompt } from "@/session/prompt"
import { $ } from "bun"
import { setTimeout as sleep } from "node:timers/promises"

type GitHubAuthor = {
login: string
Expand Down Expand Up @@ -353,7 +354,7 @@ export const GithubInstallCommand = cmd({
}

retries++
await Bun.sleep(1000)
await sleep(1000)
} while (true)

s.stop("Installed GitHub app")
Expand Down Expand Up @@ -493,6 +494,7 @@ export const GithubRunCommand = cmd({
: "issue"
: undefined

let unsubSessionEvents: (() => void) | undefined
try {
if (useGithubToken) {
const githubToken = process.env["GITHUB_TOKEN"]
Expand Down Expand Up @@ -532,7 +534,7 @@ export const GithubRunCommand = cmd({
},
],
})
subscribeSessionEvents()
unsubSessionEvents = subscribeSessionEvents()
shareId = await (async () => {
if (share === false) return
if (!share && repoData.data.private) return
Expand Down Expand Up @@ -670,6 +672,7 @@ export const GithubRunCommand = cmd({
// Also output the clean error message for the action to capture
//core.setOutput("prepare_error", e.message);
} finally {
unsubSessionEvents?.()
if (!useGithubToken) {
await restoreGitConfig()
await revokeAppToken()
Expand Down Expand Up @@ -867,7 +870,7 @@ export const GithubRunCommand = cmd({
}

let text = ""
Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
const unsub = Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
if (evt.properties.part.sessionID !== session.id) return
//if (evt.properties.part.messageID === messageID) return
const part = evt.properties.part
Expand All @@ -894,6 +897,7 @@ export const GithubRunCommand = cmd({
}
}
})
return unsub
}

async function summarize(response: string) {
Expand Down Expand Up @@ -1372,7 +1376,7 @@ Co-authored-by: ${actor} <${actor}@users.noreply.github.com>"`
} catch (e) {
if (retries > 0) {
console.log(`Retrying after ${delayMs}ms...`)
await Bun.sleep(delayMs)
await sleep(delayMs)
return withRetry(fn, retries - 1, delayMs)
}
throw e
Expand Down
45 changes: 36 additions & 9 deletions packages/opencode/src/control-plane/workspace-server/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,48 @@ export function WorkspaceServerRoutes() {
data: JSON.stringify(event),
})
}

let done = false
let resolveWait: () => void = () => {}
let heartbeat: ReturnType<typeof setInterval> | undefined

const cleanup = () => {
if (done) return
done = true
clearInterval(heartbeat)
GlobalBus.off("event", handler)
resolveWait()
}

const handler = async (event: { directory?: string; payload: unknown }) => {
await send(event.payload)
if (done) return
try {
await send(event.payload)
} catch {
cleanup()
}
}

GlobalBus.on("event", handler)
await send({ type: "server.connected", properties: {} })
const heartbeat = setInterval(() => {
void send({ type: "server.heartbeat", properties: {} })

try {
await send({ type: "server.connected", properties: {} })
} catch {
cleanup()
}

heartbeat = setInterval(async () => {
if (done) return
try {
await send({ type: "server.heartbeat", properties: {} })
} catch {
cleanup()
}
}, 10_000)

await new Promise<void>((resolve) => {
stream.onAbort(() => {
clearInterval(heartbeat)
GlobalBus.off("event", handler)
resolve()
})
resolveWait = resolve
stream.onAbort(cleanup)
})
})
})
Expand Down
4 changes: 4 additions & 0 deletions packages/opencode/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { AcpCommand } from "./cli/cmd/acp"
import { EOL } from "os"
import { WebCommand } from "./cli/cmd/web"
import { PrCommand } from "./cli/cmd/pr"
import { Instance } from "./project/instance"
import { SessionCommand } from "./cli/cmd/session"
import { DbCommand } from "./cli/cmd/db"
import path from "path"
Expand Down Expand Up @@ -203,6 +204,9 @@ try {
}
process.exitCode = 1
} finally {
// Clean up instances (PTY, MCP, LSP) before exiting.
// Timeout prevents hanging on unresponsive subprocesses.
await Promise.race([Instance.disposeAll(), new Promise((resolve) => setTimeout(resolve, 5000))])
// Some subprocesses don't react properly to SIGTERM and similar signals.
// Most notably, some docker-container-based MCP servers don't handle such signals unless
// run using `docker run --init`.
Expand Down
2 changes: 2 additions & 0 deletions packages/opencode/src/lsp/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ export namespace LSPClient {
},
async shutdown() {
l.info("shutting down")
diagnostics.clear()
for (const key of Object.keys(files)) delete files[key]
connection.end()
connection.dispose()
input.server.process.kill()
Expand Down
7 changes: 7 additions & 0 deletions packages/opencode/src/project/instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ export const Instance = {
state<S>(init: () => S, dispose?: (state: Awaited<S>) => Promise<void>): () => S {
return State.create(() => Instance.directory, init, dispose)
},
debug() {
return {
cacheSize: cache.size,
cacheKeys: [...cache.keys()],
state: State.debug(),
}
},
async dispose() {
Log.Default.info("disposing instance", { directory: Instance.directory })
await State.dispose(Instance.directory)
Expand Down
8 changes: 8 additions & 0 deletions packages/opencode/src/project/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,12 @@ export namespace State {
disposalFinished = true
log.info("state disposal completed", { key })
}

export function debug() {
const result: Record<string, number> = {}
for (const [key, entries] of recordsByKey) {
result[key] = entries.size
}
return { keys: recordsByKey.size, entries: result }
}
}
57 changes: 38 additions & 19 deletions packages/opencode/src/server/routes/global.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,40 +69,59 @@ export const GlobalRoutes = lazy(() =>
c.header("X-Accel-Buffering", "no")
c.header("X-Content-Type-Options", "nosniff")
return streamSSE(c, async (stream) => {
stream.writeSSE({
await stream.writeSSE({
data: JSON.stringify({
payload: {
type: "server.connected",
properties: {},
},
}),
})

let done = false
let resolveWait: () => void = () => {}
let heartbeat: ReturnType<typeof setInterval> | undefined

const cleanup = () => {
if (done) return
done = true
clearInterval(heartbeat)
GlobalBus.off("event", handler)
resolveWait()
log.info("global event disconnected")
}

async function handler(event: any) {
await stream.writeSSE({
data: JSON.stringify(event),
})
if (done) return
try {
await stream.writeSSE({ data: JSON.stringify(event) })
} catch {
cleanup()
}
}

GlobalBus.on("event", handler)

// Send heartbeat every 10s to prevent stalled proxy streams.
const heartbeat = setInterval(() => {
stream.writeSSE({
data: JSON.stringify({
payload: {
type: "server.heartbeat",
properties: {},
},
}),
})
heartbeat = setInterval(async () => {
if (done) return
try {
await stream.writeSSE({
data: JSON.stringify({
payload: {
type: "server.heartbeat",
properties: {},
},
}),
})
} catch {
cleanup()
}
}, 10_000)

await new Promise<void>((resolve) => {
stream.onAbort(() => {
clearInterval(heartbeat)
GlobalBus.off("event", handler)
resolve()
log.info("global event disconnected")
})
resolveWait = resolve
stream.onAbort(cleanup)
})
})
},
Expand Down
Loading
Loading