Skip to content
Merged
3 changes: 2 additions & 1 deletion packages/host-service/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ export interface CreateAppResult {
app: Hono;
injectWebSocket: ReturnType<typeof createNodeWebSocket>["injectWebSocket"];
api: ApiClient;
db: HostDb;
dispose: () => Promise<void>;
}

Expand Down Expand Up @@ -213,5 +214,5 @@ export function createApp(options: CreateAppOptions): CreateAppResult {
}
};

return { app, injectWebSocket, api, dispose };
return { app, injectWebSocket, api, db, dispose };
}
5 changes: 4 additions & 1 deletion packages/host-service/src/serve.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { PskHostAuthProvider } from "./providers/host-auth";
import { LocalModelProvider } from "./providers/model-providers";
import { installProcessSafetyNet } from "./safety";
import { initTerminalBaseEnv, resolveTerminalBaseEnv } from "./terminal/env";
import { startTerminalReaper } from "./terminal/reaper";
import { connectRelay } from "./tunnel";

async function main(): Promise<void> {
Expand Down Expand Up @@ -45,7 +46,7 @@ async function main(): Promise<void> {
apiUrl: env.SUPERSET_API_URL,
});

const { app, injectWebSocket, api } = createApp({
const { app, injectWebSocket, api, db } = createApp({
config: {
organizationId: env.ORGANIZATION_ID,
dbPath: env.HOST_DB_PATH,
Expand Down Expand Up @@ -95,6 +96,8 @@ async function main(): Promise<void> {
installProcessSafetyNet();
console.log(`[host-service] listening on http://localhost:${info.port}`);

startTerminalReaper(db);

if (env.RELAY_URL) {
void connectRelay({
api,
Expand Down
1 change: 1 addition & 0 deletions packages/host-service/src/terminal/reaper/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { startTerminalReaper } from "./reaper.ts";
105 changes: 105 additions & 0 deletions packages/host-service/src/terminal/reaper/reaper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import type { HostDb } from "../../db/index.ts";
import { terminalSessions } from "../../db/schema.ts";
import { getDaemonClient } from "../daemon-client-singleton.ts";
import { disposeSessionAndWait } from "../terminal.ts";

interface ReapResult {
reaped: number;
failed: number;
}

const REAP_INTERVAL_MS = 5 * 60 * 1000;

async function reapOrphanedSessions(
db: HostDb,
rowlessPendingSecondPass: Set<string>,
): Promise<ReapResult> {
const daemon = await getDaemonClient();
const liveSessions = (await daemon.list()).filter((session) => session.alive);
if (liveSessions.length === 0) {
rowlessPendingSecondPass.clear();
return { reaped: 0, failed: 0 };
}

const rows = db
.select({
id: terminalSessions.id,
status: terminalSessions.status,
originWorkspaceId: terminalSessions.originWorkspaceId,
})
.from(terminalSessions)
.all();
const rowById = new Map(rows.map((row) => [row.id, row]));

const orphans: { id: string; rowless: boolean }[] = [];
const stillRowless = new Set<string>();
for (const session of liveSessions) {
const row = rowById.get(session.id);
if (!row) {
if (rowlessPendingSecondPass.has(session.id)) {
orphans.push({ id: session.id, rowless: true });
} else {
stillRowless.add(session.id);
}
continue;
}
if (
row.status === "disposed" ||
row.status === "exited" ||
!row.originWorkspaceId
) {
orphans.push({ id: session.id, rowless: false });
}
}

let reaped = 0;
let failed = 0;
for (const orphan of orphans) {
try {
const result = await disposeSessionAndWait(orphan.id, db);
if (result.daemonCloseSucceeded) {
reaped += 1;
continue;
}
} catch {
// fall through to the failure path below
}
failed += 1;
// A failed kill on a confirmed (second-pass) rowless orphan is kept
// pending so the next pass retries it instead of restarting its
// two-pass clock.
if (orphan.rowless) stillRowless.add(orphan.id);
}

rowlessPendingSecondPass.clear();
for (const id of stillRowless) rowlessPendingSecondPass.add(id);

return { reaped, failed };
}

export function startTerminalReaper(db: HostDb): () => void {
const rowlessPendingSecondPass = new Set<string>();
let running = false;
const run = () => {
if (running) return;
running = true;
void reapOrphanedSessions(db, rowlessPendingSecondPass)
.then((result) => {
if (result.reaped > 0 || result.failed > 0) {
console.log(
`[host-service] terminal reaper: ${result.reaped} reaped, ${result.failed} failed`,
);
}
})
.catch((err) => {
console.warn("[host-service] terminal reaper failed:", err);
})
.finally(() => {
running = false;
});
};
run();
const interval = setInterval(run, REAP_INTERVAL_MS);
interval.unref();
return () => clearInterval(interval);
}
13 changes: 8 additions & 5 deletions packages/host-service/src/terminal/terminal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -733,14 +733,17 @@ export async function disposeSessionAndWait(

portManager.unregisterSession(terminalId);

db.update(terminalSessions)
.set({ status: "disposed", endedAt: Date.now() })
.where(eq(terminalSessions.id, terminalId))
.run();

const closeResult = closePromise
? await closePromise
: { attempted: false, succeeded: true };

if (closeResult.succeeded) {
db.update(terminalSessions)
.set({ status: "disposed", endedAt: Date.now() })
.where(eq(terminalSessions.id, terminalId))
.run();
}

return {
terminalId,
daemonCloseAttempted: closeResult.attempted,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { existsSync } from "node:fs";
import { TRPCError } from "@trpc/server";
import { eq } from "drizzle-orm";
import { and, eq, ne } from "drizzle-orm";
import { z } from "zod";
import { workspaces } from "../../../db/schema";
import { terminalSessions, workspaces } from "../../../db/schema";
import { invalidateLabelCache } from "../../../ports/static-ports";
import { runTeardown, type TeardownResult } from "../../../runtime/teardown";
import { disposeSessionsByWorkspaceId } from "../../../terminal/terminal";
Expand Down Expand Up @@ -276,6 +276,26 @@ async function runDestroy(
warnings.push(`Failed to dispose terminal sessions: ${message}`);
}

// Drop this workspace's terminal rows so its session index dies with it
// rather than lingering as `set null` orphans. Confirmed-dead rows only:
// a still-`active` row is a failed kill we keep reachable for the reaper.
try {
ctx.db
.delete(terminalSessions)
.where(
and(
eq(terminalSessions.originWorkspaceId, input.workspaceId),
ne(terminalSessions.status, "active"),
),
)
.run();
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
warnings.push(
`Failed to clear terminal session rows for ${input.workspaceId}: ${message}`,
);
}

// 2b. Worktree. Double-force unlocks the rare locked-worktree case and
// clears stale metadata when the directory was manually removed.
let worktreeRemoved = false;
Expand Down
Loading