diff --git a/assistant/src/config/schemas/platform.ts b/assistant/src/config/schemas/platform.ts index da355bc3643..13af298771b 100644 --- a/assistant/src/config/schemas/platform.ts +++ b/assistant/src/config/schemas/platform.ts @@ -87,6 +87,14 @@ export const DaemonConfigSchema = z .describe( "Whether the daemon records conversations even when no client is connected", ), + reapOrphanedSubprocesses: z + .boolean({ + error: "daemon.reapOrphanedSubprocesses must be a boolean", + }) + .default(false) + .describe( + "Whether the daemon, when running as PID 1 in a container, periodically reaps orphaned subprocesses that reparented to it. Off by default while the behavior is being validated.", + ), }) .describe("Background daemon process configuration"); diff --git a/assistant/src/daemon/lifecycle.ts b/assistant/src/daemon/lifecycle.ts index 99d981150b3..4cfea1e6939 100644 --- a/assistant/src/daemon/lifecycle.ts +++ b/assistant/src/daemon/lifecycle.ts @@ -117,6 +117,7 @@ import { maybeRebuildMemoryV2Concepts, rebuildBm25CorpusStatsAndReseedSkills, } from "./memory-v2-startup.js"; +import { startOrphanReaper, stopOrphanReaper } from "./orphan-reaper.js"; import { processMessage } from "./process-message.js"; import { runProfilerSweep } from "./profiler-run-store.js"; import { @@ -728,6 +729,7 @@ export async function runDaemon(): Promise { await server.start(); log.info("Daemon startup: DaemonServer started"); startDiskPressureGuardForLifecycle(); + startOrphanReaper(); // Kick off the update bulletin background job AFTER `server.start()` // resolves. The conversation store must be initialized before wake @@ -1324,6 +1326,7 @@ export async function runDaemon(): Promise { cleanupPidFile: () => { stopGatewayFlagListener(); stopDiskPressureGuardForLifecycle(); + stopOrphanReaper(); cleanupPidFile(); }, }); @@ -1338,6 +1341,7 @@ export async function runDaemon(): Promise { } catch (err) { log.error({ err }, "Daemon startup failed — cleaning up"); stopDiskPressureGuardForLifecycle(); + stopOrphanReaper(); cleanupPidFileIfOwner(process.pid); throw err; } diff --git a/assistant/src/daemon/orphan-reaper.test.ts b/assistant/src/daemon/orphan-reaper.test.ts new file mode 100644 index 00000000000..a56e90e5707 --- /dev/null +++ b/assistant/src/daemon/orphan-reaper.test.ts @@ -0,0 +1,210 @@ +/** + * Tests for the orphan subprocess reaper. + * + * - `parseProcStat` must read state/ppid correctly even when the executable + * name (`comm`) contains spaces and parentheses. + * - `selectReapable` must defer a zombie for one full scan interval (so libuv + * reaps its own tracked children first) and reap it on the next scan. + * - An integration test makes the test process a child subreaper via + * `PR_SET_CHILD_SUBREAPER`, orphans a grandchild, and verifies the + * exported logic defers-then-reaps the real `` entry while libuv + * independently reaps the directly-spawned (tracked) child. + */ +import { spawn } from "node:child_process"; +import { readdirSync, readFileSync } from "node:fs"; +import { dlopen, FFIType, ptr } from "bun:ffi"; +import { afterAll, describe, expect, mock, test } from "bun:test"; + +mock.module("../util/logger.js", () => ({ + getLogger: () => + new Proxy({} as Record, { get: () => () => {} }), +})); + +const { parseProcStat, selectReapable } = await import("./orphan-reaper.js"); +const { DaemonConfigSchema } = await import("../config/schemas/platform.js"); + +describe("parseProcStat", () => { + test("parses a normal stat line", () => { + // GIVEN a well-formed /proc//stat line for a zombie + const line = "1234 (bash) Z 1 1234 1234 0 -1 ..."; + // WHEN it is parsed + const parsed = parseProcStat(line); + // THEN comm, state, and ppid are extracted + expect(parsed).toEqual({ comm: "bash", state: "Z", ppid: 1 }); + }); + + test("handles comm containing spaces and parentheses", () => { + // GIVEN a stat line whose comm field itself contains spaces and parens + const line = "42 (weird (name) x) Z 1 42 42 0 -1 4194560"; + // WHEN it is parsed + const parsed = parseProcStat(line); + // THEN fields are read relative to the final ')', not naive splitting + expect(parsed).toEqual({ comm: "weird (name) x", state: "Z", ppid: 1 }); + }); + + test("reads a non-zombie running state", () => { + // GIVEN a running (non-zombie) process line + const line = "77 (node) R 12 77 77"; + // WHEN it is parsed + const parsed = parseProcStat(line); + // THEN the running state and parent pid are reported + expect(parsed?.state).toBe("R"); + expect(parsed?.ppid).toBe(12); + }); + + test("returns null for malformed lines", () => { + // GIVEN malformed or truncated stat content + // WHEN each is parsed + // THEN null is returned rather than a bogus record + expect(parseProcStat("")).toBeNull(); + expect(parseProcStat("no parens here")).toBeNull(); + expect(parseProcStat("123 (proc)")).toBeNull(); + }); +}); + +describe("selectReapable", () => { + test("defers a newly-seen zombie for one interval", () => { + // GIVEN zombies never seen on a previous scan + // WHEN deciding what to reap with an empty seen set + const { reap, nextSeen } = selectReapable([100, 101], new Set()); + // THEN nothing is reaped yet, but both are carried into the next scan + expect(reap).toEqual([]); + expect([...nextSeen].sort()).toEqual([100, 101]); + }); + + test("reaps a zombie that survived the previous scan", () => { + // GIVEN pid 100 was already seen on the prior scan and 101 is new + // WHEN deciding what to reap + const { reap } = selectReapable([100, 101], new Set([100])); + // THEN only the survivor (100) is reaped; the newcomer is deferred + expect(reap).toEqual([100]); + }); + + test("drops PIDs that have disappeared from the next seen set", () => { + // GIVEN pid 100 was seen before but is gone now + // WHEN computing the next seen set + const { nextSeen } = selectReapable([101], new Set([100, 101])); + // THEN only currently-present pids are retained + expect([...nextSeen]).toEqual([101]); + }); +}); + +describe("daemon.reapOrphanedSubprocesses gate", () => { + test("defaults to off so the reaper is opt-in", () => { + // GIVEN a daemon config with the reaper flag unspecified + // WHEN it is parsed with schema defaults + const parsed = DaemonConfigSchema.parse({}); + // THEN the reaper is disabled unless explicitly turned on + expect(parsed.reapOrphanedSubprocesses).toBe(false); + }); + + test("honors an explicit opt-in", () => { + // GIVEN a daemon config that explicitly enables the reaper + // WHEN it is parsed + const parsed = DaemonConfigSchema.parse({ reapOrphanedSubprocesses: true }); + // THEN the flag is respected + expect(parsed.reapOrphanedSubprocesses).toBe(true); + }); +}); + +// ── Integration: real reparented orphan on Linux ──────────────────────────── +const itLinux = process.platform === "linux" ? test : test.skip; + +// Bind libc only on Linux — "libc.so.6" does not exist on macOS, so binding it +// unconditionally would throw at import and break the pure-function tests too. +const lib = + process.platform === "linux" + ? dlopen("libc.so.6", { + waitpid: { + args: [FFIType.i32, FFIType.ptr, FFIType.i32], + returns: FFIType.i32, + }, + prctl: { + args: [ + FFIType.i32, + FFIType.u64, + FFIType.u64, + FFIType.u64, + FFIType.u64, + ], + returns: FFIType.i32, + }, + }) + : null; +const WNOHANG = 1; +const PR_SET_CHILD_SUBREAPER = 36; +const statusBuf = new Int32Array(1); + +function zombieChildPids(): number[] { + const self = process.pid; + const out: number[] = []; + for (const entry of readdirSync("/proc")) { + const pid = Number(entry); + if (!Number.isInteger(pid) || pid <= 1) continue; + let stat: string; + try { + stat = readFileSync(`/proc/${pid}/stat`, "utf8"); + } catch { + continue; + } + const parsed = parseProcStat(stat); + if (parsed && parsed.state === "Z" && parsed.ppid === self) out.push(pid); + } + return out; +} + +const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms)); + +afterAll(() => { + if (!lib) return; + // Reap anything still lingering so the test process leaves no zombies. + for (const pid of zombieChildPids()) + lib.symbols.waitpid(pid, ptr(statusBuf), WNOHANG); +}); + +itLinux( + "defers then reaps a reparented orphan while libuv reaps the tracked child", + async () => { + if (!lib) return; + // GIVEN this process is a child subreaper, so orphaned grandchildren + // reparent here exactly as they reparent to a PID-1 daemon + expect(lib.symbols.prctl(PR_SET_CHILD_SUBREAPER, 1n, 0n, 0n, 0n)).toBe(0); + + // AND a libuv-tracked child (A) that detaches a grandchild (B) into its + // own session and exits immediately; B reparents to us and, once it + // exits, becomes a zombie with our pid as its parent + let trackedChildExited = false; + const a = spawn("bash", ["-c", "setsid -f sleep 0.4; exit 0"], { + stdio: "ignore", + }); + a.on("exit", () => { + trackedChildExited = true; + }); + + // AND we wait for that orphan to surface as our zombie child + let zombies: number[] = []; + for (let i = 0; i < 40 && zombies.length === 0; i++) { + await sleep(50); + zombies = zombieChildPids(); + } + expect(zombies.length).toBeGreaterThan(0); + expect(trackedChildExited).toBe(true); // libuv reaped A independently + + // WHEN we run the deferred-reap algorithm across two scans + // (scan #1 is the grace pass, scan #2 reaps the survivor) + const scan1 = selectReapable(zombieChildPids(), new Set()); + const scan2 = selectReapable(zombieChildPids(), scan1.nextSeen); + let reaped = 0; + for (const pid of scan2.reap) { + if (lib.symbols.waitpid(pid, ptr(statusBuf), WNOHANG) > 0) reaped++; + } + await sleep(50); + + // THEN nothing is reaped on the grace pass, the survivor is reaped on the + // second pass, and no defunct child remains + expect(scan1.reap).toEqual([]); + expect(scan2.reap.length).toBeGreaterThan(0); + expect(reaped).toBeGreaterThan(0); + expect(zombieChildPids()).toEqual([]); + }, +); diff --git a/assistant/src/daemon/orphan-reaper.ts b/assistant/src/daemon/orphan-reaper.ts new file mode 100644 index 00000000000..65da13497c5 --- /dev/null +++ b/assistant/src/daemon/orphan-reaper.ts @@ -0,0 +1,240 @@ +/** + * Periodic reaper for orphaned subprocesses that reparent to the daemon. + * + * Tools run commands in their own process group (`detached: true`) and, on + * timeout/abort, group-kill with `process.kill(-pgid, SIGKILL)` (the bash and + * host_bash tools, the skill sandbox runner, and the debug-bash route). The + * immediate child is reaped by Bun/libuv, but its descendants — e.g. git's + * transport helpers or a skill runner's `bun` process — were never spawned by + * the daemon, so when the group dies they reparent to PID 1. When the daemon + * runs as PID 1 in a container, Bun is not an init: it never calls `waitpid()` + * on those reparented orphans, so they accumulate as `` entries that + * consume PID slots until the container is recycled. + * + * This reaper scans `/proc` for zombie children of the daemon and reaps each + * by **specific PID** with `WNOHANG`. It deliberately does NOT use + * `waitpid(-1)`: libuv reaps the children it spawned by specific PID on + * `SIGCHLD`, and a blanket `waitpid(-1)` would race libuv and could swallow a + * tracked child's exit status — libuv's own source handles the lost race by + * dropping the exit callback ("someone else stole the waitpid from us. Handle + * this by not handling it at all."). To stay clear of that race we only reap a + * zombie after it has survived at least one scan interval: libuv reaps its own + * within milliseconds of `SIGCHLD`, so anything still defunct a full interval + * later is a genuine orphan libuv is not tracking. + * + * The reaper is a no-op unless the daemon is PID 1 on Linux. Off PID 1 (local + * macOS dev, or if an init such as tini is ever placed above the daemon), + * orphans reparent to that init and are reaped there, so there is nothing for + * this to do. Because the daemon is PID 1, orphans already reparent to it and + * `PR_SET_CHILD_SUBREAPER` is unnecessary. It is additionally gated behind the + * `daemon.reapOrphanedSubprocesses` config flag (default off) so the behavior + * can be enabled per workspace for validation before becoming the default. + * + * References: + * - libuv reaps its own children by pid on SIGCHLD (`uv__wait_children`): + * https://github.com/nodejs/node/blob/main/deps/uv/src/unix/process.c + * - Subreaper reaping pattern for runtimes embedding libuv (specific-pid + + * WNOHANG, never `waitpid(-1)`, grace window for libuv co-existence): + * https://github.com/coopergwrenn/prctl-subreaper + * - waitpid(2): https://man7.org/linux/man-pages/man2/waitpid.2.html + */ + +import { readdirSync, readFileSync } from "node:fs"; +import { dlopen, FFIType, ptr } from "bun:ffi"; + +import { getConfigReadOnly } from "../config/loader.js"; +import { getLogger } from "../util/logger.js"; + +const log = getLogger("orphan-reaper"); + +/** Linux `WNOHANG` — return immediately if no child has changed state. */ +const WNOHANG = 1; + +const SCAN_INTERVAL_MS = 60_000; + +let scanTimer: ReturnType | null = null; + +/** Zombie child PIDs observed on the previous scan (the grace set). */ +let seenLastScan: Set = new Set(); + +type WaitpidFn = (pid: number, statusPtr: unknown, options: number) => number; + +let waitpid: WaitpidFn | null = null; +// Held at module scope so the backing buffer is not GC'd while `waitStatusPtr` +// keeps only a raw pointer into it (waitpid writes the exit status here). +let waitStatusBuf: Int32Array | null = null; +let waitStatusPtr: unknown = null; + +/** + * Bind libc `waitpid` via FFI. Returns false (and disables the reaper) if FFI + * is unavailable so daemon startup never fails on this subsystem. + */ +function initWaitpid(): boolean { + if (waitpid) return true; + try { + const lib = dlopen("libc.so.6", { + waitpid: { + args: [FFIType.i32, FFIType.ptr, FFIType.i32], + returns: FFIType.i32, + }, + }); + // Reusable out-param buffer for the wstatus we don't inspect. + waitStatusBuf = new Int32Array(1); + waitStatusPtr = ptr(waitStatusBuf); + waitpid = lib.symbols.waitpid as unknown as WaitpidFn; + return true; + } catch (err) { + log.warn( + { err }, + "Orphan reaper unavailable: failed to bind libc waitpid via FFI", + ); + return false; + } +} + +export interface ZombieChild { + pid: number; + comm: string; +} + +/** + * Parse a `/proc//stat` line into its leading fields. `comm` (the + * executable name) may itself contain spaces and parentheses, so the fixed + * fields are read relative to the final `)` rather than by naive splitting. + * Returns null if the line is malformed. + */ +export function parseProcStat( + content: string, +): { comm: string; state: string; ppid: number } | null { + const lparen = content.indexOf("("); + const rparen = content.lastIndexOf(")"); + if (lparen === -1 || rparen === -1 || rparen < lparen) return null; + const comm = content.slice(lparen + 1, rparen); + const rest = content.slice(rparen + 2).split(" "); + const state = rest[0]; + const ppid = Number(rest[1]); + if (!state || !Number.isInteger(ppid)) return null; + return { comm, state, ppid }; +} + +/** + * Given the zombie child PIDs seen this scan and those seen on the previous + * scan, decide which to reap now. A zombie is only reaped once it has + * survived a full interval (present in `seenLast`), leaving newly-defunct + * children for libuv to reap first. Returns the PIDs to reap and the set to + * carry into the next scan. + */ +export function selectReapable( + current: number[], + seenLast: Set, +): { reap: number[]; nextSeen: Set } { + const reap = current.filter((pid) => seenLast.has(pid)); + return { reap, nextSeen: new Set(current) }; +} + +/** + * Scan `/proc` for zombie (`Z`) processes whose parent is this daemon. + * Reparented orphans keep their original process group but their parent + * becomes PID 1 (the daemon), so they appear here once defunct. + */ +function findZombieChildren(): ZombieChild[] { + const selfPid = process.pid; + const zombies: ZombieChild[] = []; + let entries: string[]; + try { + entries = readdirSync("/proc"); + } catch { + return zombies; + } + for (const entry of entries) { + const pid = Number(entry); + if (!Number.isInteger(pid) || pid <= 1) continue; + let stat: string; + try { + stat = readFileSync(`/proc/${pid}/stat`, "utf8"); + } catch { + // Process exited between readdir and read — skip. + continue; + } + const parsed = parseProcStat(stat); + if (parsed && parsed.state === "Z" && parsed.ppid === selfPid) { + zombies.push({ pid, comm: parsed.comm }); + } + } + return zombies; +} + +/** + * Reap zombie children that have persisted for at least one scan interval, + * leaving newly-defunct children for libuv to reap first. + */ +function reapScan(): void { + if (!waitpid) return; + const zombies = findZombieChildren(); + const byPid = new Map(zombies.map((z) => [z.pid, z])); + const { reap, nextSeen } = selectReapable([...byPid.keys()], seenLastScan); + const reaped: ZombieChild[] = []; + for (const pid of reap) { + const rc = waitpid(pid, waitStatusPtr, WNOHANG); + // rc > 0: reaped. rc <= 0 (0 = not yet, -1 = ECHILD/raced): leave it. + if (rc > 0) reaped.push(byPid.get(pid)!); + } + seenLastScan = nextSeen; + if (reaped.length > 0) { + log.info( + { + count: reaped.length, + pids: reaped.map((z) => z.pid), + comms: reaped.map((z) => z.comm), + }, + "Reaped orphaned subprocesses reparented to the daemon (PID 1)", + ); + } +} + +/** + * Read the opt-in gate from workspace config (`daemon.reapOrphanedSubprocesses`), + * tolerating a missing or malformed config so startup never fails on it. + */ +function isReaperEnabled(): boolean { + try { + return getConfigReadOnly().daemon.reapOrphanedSubprocesses; + } catch { + return false; + } +} + +/** + * Start the periodic orphan reaper. No-op unless the daemon is PID 1 on Linux + * (otherwise reparented orphans are reaped by the real init) and the + * `daemon.reapOrphanedSubprocesses` config gate is enabled. + */ +export function startOrphanReaper(): void { + if (scanTimer) return; + if (process.platform !== "linux" || process.pid !== 1) { + log.info( + { platform: process.platform, pid: process.pid }, + "Orphan reaper not started: daemon is not PID 1 on Linux", + ); + return; + } + if (!isReaperEnabled()) { + log.info( + "Orphan reaper not started: daemon.reapOrphanedSubprocesses is disabled", + ); + return; + } + if (!initWaitpid()) return; + seenLastScan = new Set(); + scanTimer = setInterval(reapScan, SCAN_INTERVAL_MS); + scanTimer.unref?.(); + log.info({ intervalMs: SCAN_INTERVAL_MS }, "Orphan reaper started"); +} + +export function stopOrphanReaper(): void { + if (scanTimer) { + clearInterval(scanTimer); + scanTimer = null; + } + seenLastScan = new Set(); +}