diff --git a/bun.lock b/bun.lock index 334bd0bd0f..ea352aca95 100644 --- a/bun.lock +++ b/bun.lock @@ -924,7 +924,7 @@ }, "packages/pty-daemon": { "name": "@superset/pty-daemon", - "version": "0.2.2", + "version": "0.2.3", "bin": { "pty-daemon": "./src/main.ts", }, diff --git a/packages/host-service/src/daemon/DaemonSupervisor.node-test.ts b/packages/host-service/src/daemon/DaemonSupervisor.node-test.ts index b06454025c..b617a3e094 100644 --- a/packages/host-service/src/daemon/DaemonSupervisor.node-test.ts +++ b/packages/host-service/src/daemon/DaemonSupervisor.node-test.ts @@ -109,6 +109,7 @@ describe("DaemonSupervisor.ensure (real spawn)", () => { assert.equal(b.socketPath, a.socketPath); assert.equal(b.runningVersion, a.expectedVersion); assert.equal(b.updatePending, false); + dropSupervisorInstance(supA, "org-adopt"); } catch (err) { // On failure, make sure A still cleans up. await supA.stop("org-adopt").catch(() => {}); @@ -265,10 +266,11 @@ describe("DaemonSupervisor.ensure (real spawn)", () => { supervisorsToCleanup.push({ sup: supB, orgId }); const b = await supB.ensure(orgId); assert.equal(b.pid, adoptedPid, "B should adopt A's daemon"); + dropSupervisorInstance(supA, orgId); - // Externally kill the adopted daemon. supA never had a child - // handle so its on-exit handler can't fire; supB only adopted - // (no child handle either). The poller must catch this. + // Externally kill the adopted daemon. supA's in-memory state was + // dropped to simulate the original host-service process being gone; + // supB only adopted (no child handle). The poller must catch this. process.kill(adoptedPid, "SIGKILL"); // Wait up to 6s for the liveness poller (2s interval) to fire. @@ -644,13 +646,12 @@ describe("DaemonSupervisor.update (Phase 2 fd-handoff)", () => { } }); - test("auto-update failure force-restarts the daemon", async () => { + test("auto-update defers when live sessions exist", async () => { // Heavy path: auto-update fires on every adopt with version drift. - // When smooth handoff fails during automatic app update, there is no - // foreground UI to ask the user for the destructive fallback. The - // supervisor force-restarts so the app does not keep adopting a stale - // daemon indefinitely. - const orgId = "org-autoupdate-fail"; + // If the stale daemon owns live shells, the background path should + // not silently handoff/restart under the user's typing. The visible + // Settings action remains available for a user-approved update. + const orgId = "org-autoupdate-live-defer"; const socketPath = path.join( os.tmpdir(), `superset-ptyd-${crypto @@ -707,73 +708,50 @@ describe("DaemonSupervisor.update (Phase 2 fd-handoff)", () => { organizationId: orgId, }); - // Predecessor uses process.argv[1] (its own bundle) for handoff, - // not the supervisor's scriptPath — so injecting a bad scriptPath - // won't fail the update. Override runUpdate directly to drive the - // failure deterministically. const sup = new DaemonSupervisor({ scriptPath: DAEMON_BUNDLE }); supervisorsToCleanup.push({ sup, orgId }); + let runUpdateCalled = false; ( sup as unknown as { runUpdate: () => Promise<{ ok: false; reason: string }>; } - ).runUpdate = async () => ({ - ok: false, - reason: "induced failure (test): simulated transient error", - }); + ).runUpdate = async () => { + runUpdateCalled = true; + return { + ok: false as const, + reason: "auto-update should have deferred before runUpdate", + }; + }; const adopted = await sup.ensure(orgId); assert.equal(adopted.updatePending, true); const predecessorPid = adopted.pid; - // Auto-update is fire-and-forget. Smooth update fails first, then - // the fallback force-restart swaps the daemon pid. - const deadline = Date.now() + 5000; - let currentPid = predecessorPid; - while (Date.now() < deadline) { - const inst = ( - sup as unknown as { instances: Map } - ).instances.get(orgId); - if (inst && inst.pid !== predecessorPid) { - currentPid = inst.pid; - break; - } - await new Promise((r) => setTimeout(r, 100)); - } - assert.notEqual( - currentPid, - predecessorPid, - `auto-update fallback did not force-restart within 5s (still ${predecessorPid})`, - ); - assert.equal(isAlive(currentPid), true, "fresh daemon should be alive"); + await new Promise((r) => setTimeout(r, 500)); + const inst = ( + sup as unknown as { instances: Map } + ).instances.get(orgId); + assert.equal(inst?.pid, predecessorPid); + assert.equal(isAlive(predecessorPid), true); + assert.equal(runUpdateCalled, false); const status = sup.getUpdateStatus(orgId); assert.ok(status); assert.equal( status.pending, - false, - `pending must clear after force restart (running=${status.running})`, - ); - - const exitDeadline = Date.now() + 2000; - while (Date.now() < exitDeadline && isAlive(predecessorPid)) { - await new Promise((r) => setTimeout(r, 50)); - } - assert.equal( - isAlive(predecessorPid), - false, - "predecessor pid must be terminated by force restart", + true, + `pending should remain visible for manual update (running=${status.running})`, ); const verifyClient = new DaemonClient({ socketPath }); await verifyClient.connect(); const sessions = await verifyClient.list(); const survivor = sessions.find((s) => s.id === "survivor"); - assert.equal( + assert.ok( survivor, - undefined, - `force restart must drop predecessor sessions: ${JSON.stringify(sessions)}`, + `live session should remain on predecessor: ${JSON.stringify(sessions)}`, ); + assert.equal(survivor.pid, shellPid); await verifyClient.dispose(); } catch (err) { try { @@ -805,6 +783,15 @@ function isAlive(pid: number): boolean { } } +function dropSupervisorInstance( + sup: DaemonSupervisor, + organizationId: string, +): void { + (sup as unknown as { instances: Map }).instances.delete( + organizationId, + ); +} + function isReachable(socketPath: string): Promise { return new Promise((resolve) => { const sock = net.createConnection({ path: socketPath }); diff --git a/packages/host-service/src/daemon/DaemonSupervisor.test.ts b/packages/host-service/src/daemon/DaemonSupervisor.test.ts index 879399ff9b..869cd6bca1 100644 --- a/packages/host-service/src/daemon/DaemonSupervisor.test.ts +++ b/packages/host-service/src/daemon/DaemonSupervisor.test.ts @@ -10,6 +10,8 @@ // filter for our component prefix. import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"; +import * as childProcess from "node:child_process"; +import * as fs from "node:fs"; import * as net from "node:net"; import * as os from "node:os"; import * as path from "node:path"; @@ -18,7 +20,12 @@ import { encodeFrame, FrameDecoder, } from "@superset/pty-daemon/protocol"; -import { DaemonSupervisor, probeDaemonVersion } from "./DaemonSupervisor.ts"; +import { + DaemonSupervisor, + probeDaemonVersion, + shouldKillStaleDaemonForDev, +} from "./DaemonSupervisor.ts"; +import { writePtyDaemonManifest } from "./manifest.ts"; // Capture supervisor-emitted log events. We replace console.log for the // duration of the test, then filter for our supervisor's component prefix. @@ -187,6 +194,85 @@ describe("probeDaemonVersion", () => { }); }); +describe("shouldKillStaleDaemonForDev", () => { + test("keeps production adoption behavior", () => { + expect(shouldKillStaleDaemonForDev({ NODE_ENV: "production" })).toBe(false); + }); + + test("kills stale daemons in dev by default", () => { + expect(shouldKillStaleDaemonForDev({ NODE_ENV: "development" })).toBe(true); + expect(shouldKillStaleDaemonForDev({})).toBe(true); + }); + + test("allows production-like adoption in dev for handoff testing", () => { + expect( + shouldKillStaleDaemonForDev({ + NODE_ENV: "development", + SUPERSET_PTY_DAEMON_ADOPT_IN_DEV: "1", + }), + ).toBe(false); + }); +}); + +describe("DaemonSupervisor.tryAdopt", () => { + test("rejects and replaces a reachable daemon that cannot answer the version probe", async () => { + const orgId = "org-unprobeable"; + const fake = await startFakeDaemon({ silent: true }); + const child = childProcess.spawn( + process.execPath, + ["-e", "setInterval(() => {}, 1000)"], + { stdio: "ignore" }, + ); + const childPid = child.pid; + expect(typeof childPid).toBe("number"); + if (!childPid) { + await fake.close(); + return; + } + + const originalHome = process.env.SUPERSET_HOME_DIR; + const tmpHome = fs.mkdtempSync(path.join(os.tmpdir(), "pty-daemon-unit-")); + process.env.SUPERSET_HOME_DIR = tmpHome; + try { + writePtyDaemonManifest({ + pid: childPid, + socketPath: fake.socketPath, + protocolVersions: [1], + startedAt: Date.now(), + organizationId: orgId, + }); + + const sup = new DaemonSupervisor({ scriptPath: "/nonexistent" }); + const adopted = await invokeTryAdopt(sup, orgId); + expect(adopted).toBeNull(); + expect( + loggedEvents.some( + (e) => + e.event === "pty_daemon_adopt_rejected" && + e.props.reason === "version_probe_failed" && + e.props.pid === childPid, + ), + ).toBe(true); + expect(await waitForProcessExit(childPid, 2500)).toBe(true); + } finally { + await fake.close(); + if (isProcessAliveForTest(childPid)) { + try { + process.kill(childPid, "SIGKILL"); + } catch { + // already gone + } + } + if (originalHome !== undefined) { + process.env.SUPERSET_HOME_DIR = originalHome; + } else { + delete process.env.SUPERSET_HOME_DIR; + } + fs.rmSync(tmpHome, { recursive: true, force: true }); + } + }); +}); + describe("DaemonSupervisor.getUpdateStatus", () => { let sup: DaemonSupervisor; @@ -515,6 +601,7 @@ describe("DaemonSupervisor auto-update fallback", () => { test("force-restarts when the background smooth update returns ok:false", async () => { const instance = staleInstance("0.0.9"); seedDaemonInstance(sup, "org-auto-fallback", instance); + mockListSessions(sup, []); const runUpdateMock = mock(async () => ({ ok: false as const, reason: "snapshot write failed: ENOSPC", @@ -541,7 +628,7 @@ describe("DaemonSupervisor auto-update fallback", () => { ).forceRestart = forceRestartMock; invokeKickoffAutoUpdate(sup, "org-auto-fallback", instance); - await new Promise((r) => setTimeout(r, 0)); + await flushAutoUpdate(); expect(runUpdateMock).toHaveBeenCalledWith("org-auto-fallback"); expect(forceRestartMock).toHaveBeenCalledTimes(1); @@ -565,6 +652,7 @@ describe("DaemonSupervisor auto-update fallback", () => { test("force-restarts when the background smooth update throws", async () => { const instance = staleInstance("0.0.8"); seedDaemonInstance(sup, "org-auto-throw", instance); + mockListSessions(sup, []); const runUpdateMock = mock(async () => { throw new Error("transport: ECONNRESET"); }); @@ -590,7 +678,7 @@ describe("DaemonSupervisor auto-update fallback", () => { ).forceRestart = forceRestartMock; invokeKickoffAutoUpdate(sup, "org-auto-throw", instance); - await new Promise((r) => setTimeout(r, 0)); + await flushAutoUpdate(); expect(forceRestartMock).toHaveBeenCalledTimes(1); expect(forceRestartCalls[0]?.log).toMatchObject({ @@ -604,6 +692,7 @@ describe("DaemonSupervisor auto-update fallback", () => { test("skips force-restart when the failed stale daemon is no longer current", async () => { const instance = staleInstance("0.0.7"); seedDaemonInstance(sup, "org-auto-changed", instance); + mockListSessions(sup, []); const runUpdateMock = mock(async () => { seedDaemonInstance(sup, "org-auto-changed", { ...instance, @@ -629,7 +718,7 @@ describe("DaemonSupervisor auto-update fallback", () => { ).forceRestart = forceRestartMock; invokeKickoffAutoUpdate(sup, "org-auto-changed", instance); - await new Promise((r) => setTimeout(r, 0)); + await flushAutoUpdate(); expect(forceRestartMock).not.toHaveBeenCalled(); expect( @@ -643,9 +732,78 @@ describe("DaemonSupervisor auto-update fallback", () => { ).toBe(true); }); + test("defers the background update when live sessions are present", async () => { + const instance = staleInstance("0.0.6"); + seedDaemonInstance(sup, "org-auto-live", instance); + mockListSessions(sup, [aliveSession()]); + const runUpdateMock = mock(async () => ({ + ok: true as const, + successorPid: 7777, + })); + (sup as unknown as { runUpdate: typeof runUpdateMock }).runUpdate = + runUpdateMock; + + invokeKickoffAutoUpdate(sup, "org-auto-live", instance); + await flushAutoUpdate(); + + expect(runUpdateMock).not.toHaveBeenCalled(); + expect( + loggedEvents.some( + (e) => + e.event === "pty_daemon_auto_update_deferred" && + e.props.reason === "live_sessions_present" && + e.props.aliveSessionCount === 1 && + e.props.pid === instance.pid, + ), + ).toBe(true); + }); + + test("skips force-restart when live sessions appear after a failed smooth update", async () => { + const instance = staleInstance("0.0.5"); + seedDaemonInstance(sup, "org-auto-race", instance); + let listCalls = 0; + const listSessionsMock = mock(async () => { + listCalls += 1; + return listCalls === 1 ? [] : [aliveSession("late-live")]; + }); + (sup as unknown as { listSessions: typeof sup.listSessions }).listSessions = + listSessionsMock as typeof sup.listSessions; + const runUpdateMock = mock(async () => ({ + ok: false as const, + reason: "successor ack timed out after 5000ms", + })); + const forceRestartMock = mock(async () => ({ success: true as const })); + (sup as unknown as { runUpdate: typeof runUpdateMock }).runUpdate = + runUpdateMock; + ( + sup as unknown as { + forceRestart: ( + organizationId: string, + log: ForceRestartLog, + ) => Promise<{ success: true }>; + } + ).forceRestart = forceRestartMock; + + invokeKickoffAutoUpdate(sup, "org-auto-race", instance); + await flushAutoUpdate(); + + expect(runUpdateMock).toHaveBeenCalledTimes(1); + expect(forceRestartMock).not.toHaveBeenCalled(); + expect( + loggedEvents.some( + (e) => + e.event === "pty_daemon_auto_update_force_restart_skipped" && + e.props.reason === "live_sessions_present" && + e.props.aliveSessionCount === 1 && + e.props.smoothUpdatePid === instance.pid, + ), + ).toBe(true); + }); + test("skips force-restart when auto-update joins an existing manual update", async () => { const instance = staleInstance("0.0.6"); seedDaemonInstance(sup, "org-auto-coalesced", instance); + mockListSessions(sup, []); const deferred = createDeferred<{ ok: false; reason: string }>(); const runUpdateMock = mock(() => deferred.promise); const forceRestartMock = mock(async () => ({ success: true as const })); @@ -668,7 +826,7 @@ describe("DaemonSupervisor auto-update fallback", () => { }); await manualUpdate; - await new Promise((r) => setTimeout(r, 0)); + await flushAutoUpdate(); expect(runUpdateMock).toHaveBeenCalledTimes(1); expect(forceRestartMock).not.toHaveBeenCalled(); @@ -765,6 +923,30 @@ function seedDaemonInstance( ); } +function mockListSessions( + sup: DaemonSupervisor, + sessions: Awaited>, +): void { + const listSessionsMock = mock(async () => sessions); + (sup as unknown as { listSessions: typeof sup.listSessions }).listSessions = + listSessionsMock as typeof sup.listSessions; +} + +function aliveSession(id = "live") { + return { + id, + pid: 4321, + cols: 80, + rows: 24, + alive: true, + }; +} + +async function flushAutoUpdate(): Promise { + await new Promise((r) => setTimeout(r, 0)); + await new Promise((r) => setTimeout(r, 0)); +} + function freshInstance() { return { pid: 1234, @@ -810,3 +992,35 @@ function invokeKickoffAutoUpdate( } ).kickoffAutoUpdate(organizationId, instance); } + +function invokeTryAdopt( + sup: DaemonSupervisor, + organizationId: string, +): Promise { + return ( + sup as unknown as { + tryAdopt: (id: string) => Promise; + } + ).tryAdopt(organizationId); +} + +async function waitForProcessExit( + pid: number, + timeoutMs: number, +): Promise { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + if (!isProcessAliveForTest(pid)) return true; + await new Promise((r) => setTimeout(r, 25)); + } + return false; +} + +function isProcessAliveForTest(pid: number): boolean { + try { + process.kill(pid, 0); + return true; + } catch (err) { + return (err as NodeJS.ErrnoException).code === "EPERM"; + } +} diff --git a/packages/host-service/src/daemon/DaemonSupervisor.ts b/packages/host-service/src/daemon/DaemonSupervisor.ts index 9a8292e60f..2813de5d49 100644 --- a/packages/host-service/src/daemon/DaemonSupervisor.ts +++ b/packages/host-service/src/daemon/DaemonSupervisor.ts @@ -55,6 +55,8 @@ const VERSION_PROBE_TIMEOUT_MS = 1_500; const HANDOFF_PREDECESSOR_EXIT_TIMEOUT_MS = 3_000; const HANDOFF_PROBE_TOTAL_TIMEOUT_MS = 3_000; const DAEMON_TERMINATE_TIMEOUT_MS = 1_000; +const AUTO_UPDATE_SESSION_LIST_TIMEOUT_MS = 1_500; +const ADOPTION_PROBE_TOTAL_TIMEOUT_MS = 3_000; /** * Crash supervision parameters. If the daemon for an organization crashes @@ -66,6 +68,14 @@ const CRASH_BUDGET = 3; const CRASH_WINDOW_MS = 60_000; /** How often to poll an adopted daemon's PID for liveness. */ const ADOPTED_LIVENESS_INTERVAL_MS = 2_000; +const ADOPT_IN_DEV_ENV = "SUPERSET_PTY_DAEMON_ADOPT_IN_DEV"; + +export function shouldKillStaleDaemonForDev( + env: NodeJS.ProcessEnv = process.env, +): boolean { + if (env.NODE_ENV === "production") return false; + return env[ADOPT_IN_DEV_ENV] !== "1"; +} /** * Per-organization socket path. **Must stay short** — Darwin's `sun_path` @@ -489,10 +499,10 @@ export class DaemonSupervisor { /** * Auto-update: best-effort opportunistic handoff when the adopted * daemon is older than the bundled binary. Runs after host-service - * boot, fire-and-track, doesn't block anything. On smooth handoff - * failure we force-restart the daemon: auto-update has no foreground - * UI to ask the user for the destructive fallback, and leaving a stale - * daemon adopted by a newer app risks a bad mixed-version state. + * boot, fire-and-track, doesn't block anything. The background path + * is intentionally conservative: live sessions keep running on the + * predecessor and the foreground Settings UI remains the place for + * user-approved handoff/restart. */ private kickoffAutoUpdate( organizationId: string, @@ -504,67 +514,149 @@ export class DaemonSupervisor { expectedVersion: instance.expectedVersion, pid: instance.pid, }); + void this.runAutoUpdate(organizationId, instance).catch((err) => { + logEvent("pty_daemon_auto_update_failed", { + organizationId, + pid: instance.pid, + runningVersion: instance.runningVersion, + expectedVersion: instance.expectedVersion, + reason: `threw: ${(err as Error).message}`, + }); + }); + } + + private async runAutoUpdate( + organizationId: string, + instance: DaemonInstance, + ): Promise { + const sessions = await this.listSessions( + organizationId, + AUTO_UPDATE_SESSION_LIST_TIMEOUT_MS, + ); + if (sessions === null) { + this.deferAutoUpdate( + organizationId, + instance, + "session_list_unavailable", + ); + return; + } + const aliveSessionCount = countAliveSessions(sessions); + if (aliveSessionCount > 0) { + this.deferAutoUpdate(organizationId, instance, "live_sessions_present", { + aliveSessionCount, + }); + return; + } + // If a manual Update click already owns the in-flight handoff, leave // any destructive fallback to that foreground UI path. const update = this.startUpdate(organizationId); - void update.promise.then( - (result) => { - if (result.ok) { - logEvent("pty_daemon_auto_update_ok", { - organizationId, - previousPid: instance.pid, - successorPid: result.successorPid, - previousVersion: instance.runningVersion, - }); - } else { - logEvent("pty_daemon_auto_update_failed", { - organizationId, - pid: instance.pid, - runningVersion: instance.runningVersion, - expectedVersion: instance.expectedVersion, - reason: result.reason, - }); - if (!update.started) { - this.skipAutoUpdateForceRestart( - organizationId, - instance, - result.reason, - "update_already_in_flight", - ); - return; - } - void this.forceRestartAfterAutoUpdateFailure( - organizationId, - instance, - result.reason, - ); - } - }, - (err) => { - const reason = `threw: ${(err as Error).message}`; - logEvent("pty_daemon_auto_update_failed", { + try { + const result = await update.promise; + if (result.ok) { + logEvent("pty_daemon_auto_update_ok", { organizationId, - pid: instance.pid, - runningVersion: instance.runningVersion, - expectedVersion: instance.expectedVersion, - reason, + previousPid: instance.pid, + successorPid: result.successorPid, + previousVersion: instance.runningVersion, }); - if (!update.started) { - this.skipAutoUpdateForceRestart( - organizationId, - instance, - reason, - "update_already_in_flight", - ); - return; - } - void this.forceRestartAfterAutoUpdateFailure( + return; + } + + logEvent("pty_daemon_auto_update_failed", { + organizationId, + pid: instance.pid, + runningVersion: instance.runningVersion, + expectedVersion: instance.expectedVersion, + reason: result.reason, + }); + if (!update.started) { + this.skipAutoUpdateForceRestart( + organizationId, + instance, + result.reason, + "update_already_in_flight", + ); + return; + } + await this.forceRestartAfterAutoUpdateFailure( + organizationId, + instance, + result.reason, + ); + } catch (err) { + const reason = `threw: ${(err as Error).message}`; + logEvent("pty_daemon_auto_update_failed", { + organizationId, + pid: instance.pid, + runningVersion: instance.runningVersion, + expectedVersion: instance.expectedVersion, + reason, + }); + if (!update.started) { + this.skipAutoUpdateForceRestart( organizationId, instance, reason, + "update_already_in_flight", ); - }, + return; + } + await this.forceRestartAfterAutoUpdateFailure( + organizationId, + instance, + reason, + ); + } + } + + private deferAutoUpdate( + organizationId: string, + instance: DaemonInstance, + reason: string, + extra: Record = {}, + ): void { + logEvent("pty_daemon_auto_update_deferred", { + organizationId, + pid: instance.pid, + runningVersion: instance.runningVersion, + expectedVersion: instance.expectedVersion, + reason, + ...extra, + }); + } + + private async canAutoUpdateForceRestart( + organizationId: string, + instance: DaemonInstance, + failureReason: string, + ): Promise { + const sessions = await this.listSessions( + organizationId, + AUTO_UPDATE_SESSION_LIST_TIMEOUT_MS, ); + if (sessions === null) { + this.skipAutoUpdateForceRestart( + organizationId, + instance, + failureReason, + "session_list_unavailable", + ); + return false; + } + const aliveSessionCount = countAliveSessions(sessions); + if (aliveSessionCount > 0) { + this.skipAutoUpdateForceRestart( + organizationId, + instance, + failureReason, + "live_sessions_present", + { aliveSessionCount }, + ); + return false; + } + return true; } private skipAutoUpdateForceRestart( @@ -572,12 +664,14 @@ export class DaemonSupervisor { instance: DaemonInstance, failureReason: string, reason: string, + extra: Record = {}, ): void { logEvent("pty_daemon_auto_update_force_restart_skipped", { organizationId, smoothUpdatePid: instance.pid, smoothUpdateFailureReason: failureReason, reason, + ...extra, }); } @@ -620,6 +714,11 @@ export class DaemonSupervisor { }); return; } + if ( + !(await this.canAutoUpdateForceRestart(organizationId, current, reason)) + ) { + return; + } try { await this.forceRestart(organizationId, { @@ -677,7 +776,7 @@ export class DaemonSupervisor { // reason. Kill any running daemon for the org and spawn fresh. // Production keeps the adopt path so PTY sessions survive // host-service restarts (the original Phase 1 promise). - if (process.env.NODE_ENV !== "production") { + if (shouldKillStaleDaemonForDev()) { await this.killStaleDaemonForDev(organizationId); } @@ -759,13 +858,26 @@ export class DaemonSupervisor { return null; } - const probed = await probeDaemonVersion( + const probed = await probeDaemonVersionWithRetry( manifest.socketPath, - VERSION_PROBE_TIMEOUT_MS, + ADOPTION_PROBE_TOTAL_TIMEOUT_MS, + ); + if (!probed) { + logEvent("pty_daemon_adopt_rejected", { + organizationId, + pid: manifest.pid, + socketPath: manifest.socketPath, + reason: "version_probe_failed", + }); + await terminateProcessTreeAndGroups(manifest.pid, "SIGTERM"); + removePtyDaemonManifest(organizationId); + return null; + } + const runningVersion = probed; + const updatePending = !semver.satisfies( + probed, + `>=${EXPECTED_DAEMON_VERSION}`, ); - const runningVersion = probed ?? "unknown"; - const updatePending = - !!probed && !semver.satisfies(probed, `>=${EXPECTED_DAEMON_VERSION}`); return { pid: manifest.pid, @@ -987,6 +1099,10 @@ function pipeWithPrefix( }); } +function countAliveSessions(sessions: SessionInfo[]): number { + return sessions.filter((session) => session.alive).length; +} + async function waitForSocket( socketPath: string, timeoutMs: number, diff --git a/packages/pty-daemon/package.json b/packages/pty-daemon/package.json index 4190492ebc..299f55cf10 100644 --- a/packages/pty-daemon/package.json +++ b/packages/pty-daemon/package.json @@ -1,6 +1,6 @@ { "name": "@superset/pty-daemon", - "version": "0.2.2", + "version": "0.2.3", "private": true, "type": "module", "exports": { diff --git a/packages/pty-daemon/src/Pty/Pty.ts b/packages/pty-daemon/src/Pty/Pty.ts index 975975e646..054b09c617 100644 --- a/packages/pty-daemon/src/Pty/Pty.ts +++ b/packages/pty-daemon/src/Pty/Pty.ts @@ -1,5 +1,6 @@ import * as childProcess from "node:child_process"; import * as fs from "node:fs"; +import * as tty from "node:tty"; import * as nodePty from "node-pty"; import { type ProcessSignalError, @@ -195,7 +196,7 @@ export function spawn({ meta }: SpawnOptions): Pty { * `forkpty` was run; the fd already existed). We build a thin adapter * directly on the fd: * - * - read via fs.createReadStream + * - read via tty.ReadStream * - write via direct fs.writeSync calls * - kill via process.kill(pid) * - onExit: read-stream 'end'/'error' OR PID-liveness poll (whichever first) @@ -210,7 +211,7 @@ class AdoptedPty implements Pty { readonly pid: number; meta: SessionMeta; private readonly fd: number; - private readonly reader: fs.ReadStream; + private readonly reader: tty.ReadStream; private exitFired = false; private livenessTimer: NodeJS.Timeout | null = null; private killEscalationTimer: NodeJS.Timeout | null = null; @@ -220,7 +221,7 @@ class AdoptedPty implements Pty { this.fd = fd; this.pid = pid; this.meta = meta; - this.reader = fs.createReadStream("", { fd, autoClose: false }); + this.reader = new tty.ReadStream(fd); // onExit signal sources: // 1. read stream 'end' or 'error' — the slave-side close drives EOF @@ -232,20 +233,12 @@ class AdoptedPty implements Pty { if (this.exitFired) return; this.exitFired = true; if (this.livenessTimer) clearInterval(this.livenessTimer); - // Close the read stream and inherited fd so the successor daemon - // does not stay alive after the shell exits. We use - // `autoClose: false` so handoff-time refcounting works, which - // means we drive fd close ourselves. + // tty.ReadStream owns the inherited fd; destroying the stream closes it. try { this.reader.destroy(); } catch { // already closed } - try { - fs.closeSync(this.fd); - } catch { - // already closed - } for (const cb of this.exitCallbacks) cb(info); }; this.reader.on("end", () => onExit({ code: null, signal: null }));