diff --git a/bun.lock b/bun.lock index e1f79454908..b58736baab0 100644 --- a/bun.lock +++ b/bun.lock @@ -868,6 +868,22 @@ "typescript": "^5.9.3", }, }, + "packages/pty-daemon": { + "name": "@superset/pty-daemon", + "version": "0.1.0", + "bin": { + "pty-daemon": "./src/main.ts", + }, + "dependencies": { + "node-pty": "1.1.0", + }, + "devDependencies": { + "@superset/typescript": "workspace:*", + "@types/node": "^24.9.1", + "bun-types": "^1.3.1", + "typescript": "^5.9.3", + }, + }, "packages/shared": { "name": "@superset/shared", "version": "0.1.0", @@ -2573,6 +2589,8 @@ "@superset/port-scanner": ["@superset/port-scanner@workspace:packages/port-scanner"], + "@superset/pty-daemon": ["@superset/pty-daemon@workspace:packages/pty-daemon"], + "@superset/relay": ["@superset/relay@workspace:apps/relay"], "@superset/shared": ["@superset/shared@workspace:packages/shared"], diff --git a/packages/pty-daemon/README.md b/packages/pty-daemon/README.md new file mode 100644 index 00000000000..b2f169cdd85 --- /dev/null +++ b/packages/pty-daemon/README.md @@ -0,0 +1,119 @@ +# @superset/pty-daemon + +Long-lived PTY-owning process for the v2 desktop terminal. host-service is a +client over a Unix socket; routine host-service upgrades don't touch shells. + +Implements [Phase 1 of the daemon plan](../../apps/desktop/plans/20260429-pty-daemon-implementation.md). +This package is **standalone**: it does not import from `@superset/host-service` +or any other workspace package. Host-service consumes only the protocol types +via `@superset/pty-daemon/protocol`. + +## Runtime + +**Production: Node ≥ 20** (Electron's bundled Node), via +`process.execPath` — exactly the same pattern as `host-service` already +uses today (`packages/host-service/build.ts` → `dist/host-service.js`, +spawned by `apps/desktop/src/main/lib/host-service-coordinator.ts`). +Bun is the build tool, not a runtime. **No new runtime in the desktop +app bundle.** + +**Why not Bun at runtime:** verified during development that node-pty +1.1's master fd handling is incompatible with Bun 1.3 (`tty.ReadStream` +closes immediately, alternate `fs.createReadStream(null, { fd })` +returns EAGAIN with no recovery). The daemon needs a runtime where +node-pty actually works. + +**Dev:** unit tests run under Bun (`bun test`) for speed; integration +tests run under Node (`bun run test:integration`) since they touch real +PTYs. The daemon binary itself runs under Node in both dev and prod. + +## Layout + +``` +src/ +├── main.ts # Node entrypoint: argv → Server.listen() +├── index.ts # Public exports for host-service consumers +├── protocol/ # Wire schemas + length-prefixed framing +│ ├── version.ts # CURRENT_PROTOCOL_VERSION + supported list +│ ├── messages.ts # ClientMessage / ServerMessage unions +│ ├── framing.ts # encodeFrame / FrameDecoder (4-byte BE prefix) +│ └── index.ts +├── Pty/ # node-pty thin wrapper with dim validation +│ ├── Pty.ts +│ └── index.ts +├── SessionStore/ # in-memory map + 64KB ring buffer per session +│ ├── SessionStore.ts +│ └── index.ts +├── handlers/ # pure functions: open/input/resize/close/list/subscribe +│ ├── handlers.ts +│ └── index.ts +└── Server/ # AF_UNIX SOCK_STREAM accept loop, handshake, dispatch + ├── Server.ts + └── index.ts + +test/ +├── helpers/ +│ └── client.ts # reusable DaemonClient: connect, send, waitFor, collect +├── integration.test.ts # smoke / happy-path (3 tests) +└── control-plane.test.ts # exhaustive control-plane coverage (25 tests) + +build.ts # Bun bundler → dist/pty-daemon.js (target: node) +``` + +## Design notes + +- **Stateless from the client's perspective.** Every protocol call carries + full context. No client tracking, no session tombstones, no business + rules. Single design principle from + [the implementation plan](../../apps/desktop/plans/20260429-pty-daemon-implementation.md#the-single-design-principle). +- **Auth boundary = Unix socket file mode 0600.** No in-band tokens. The + daemon trusts whoever can open the socket. +- **Buffer is in-memory only.** Survives host-service restarts (because the + daemon does), but never persisted to disk. No SQLite, no scrollback files. + v1's `HistoryManager` is explicitly out of scope. +- **Protocol versioned from day one.** Handshake (`hello` / `hello-ack`) + picks the highest mutually supported version. + +## Testing + +```sh +bun test # 24 unit tests (protocol framing, handlers, SessionStore, Pty validation) +bun run test:integration # 28 integration tests under node --test: + # - test/integration.test.ts (smoke / happy-path, 3 tests) + # - test/control-plane.test.ts (every usage pattern, 25 tests) +bun run typecheck # tsc --noEmit +bun run build:daemon # bundle src/main.ts → dist/pty-daemon.js (target: node) +``` + +**Control-plane coverage** (`test/control-plane.test.ts`): + +- Handshake: rejects non-hello first, picks highest mutual protocol, rejects unsupported, rejects duplicate hello. +- Session lifecycle: invalid dims, duplicate ids, ENOENT on missing, instant-exit shells, SIGKILL on hung shells. +- I/O patterns: resize during running shell, burst output (200 lines), multi-byte UTF-8 (🚀). +- Multi-client fan-out: two subscribers see same output, unsubscribe stops further delivery, dropped subscriber doesn't crash daemon. +- Detach + reattach (the headline feature): late subscriber gets replay, full reattach cycle continues live after disconnect. +- list reflects active sessions with cols/rows/alive. +- Hostile input: malformed frames disconnect cleanly, oversized frames are rejected, input on exited session returns EEXITED. +- Concurrency: 20 sessions in parallel from one connection, 10 connections opening sessions in parallel. +- Server shutdown: in-flight clients disconnect cleanly, owned PTYs are killed. +- Framing: tolerates split frames across multiple TCP chunks. + +Why two runners? `bun test` is fast for pure-JS work. node-pty doesn't work +under Bun, so anything that spawns a real PTY runs under Node. + +## Running locally + +```sh +bun run start --socket=/tmp/pty-daemon.sock +``` + +Logs go to stderr; stdout stays empty (so the daemon can later be supervised +by host-service with stdout reserved for protocol or kept dark). + +## Out of scope (Phase 1) + +- Host-service integration (DaemonClient, terminal.ts refactor, manifest + adoption) — separate PR. +- Daemon-upgrade handoff via `child_process.spawn` `stdio` fd inheritance + — separate PR (Phase 2 of the plan). +- Windows ConPTY — not in v1 protocol; defer until Windows users justify it. diff --git a/packages/pty-daemon/build.ts b/packages/pty-daemon/build.ts new file mode 100644 index 00000000000..ff7613a8b13 --- /dev/null +++ b/packages/pty-daemon/build.ts @@ -0,0 +1,32 @@ +/** + * Bundles the pty-daemon entry point into a single JS file executable by a + * standalone Node.js runtime (matches packages/host-service/build.ts). Native + * addons (node-pty) are marked external and resolved from the desktop app's + * lib/native/ at runtime. + * + * Production: Electron spawns the daemon via process.execPath (its bundled + * Node), exactly like host-service. No Bun in the production bundle. + */ +import { existsSync, mkdirSync } from "node:fs"; + +const outdir = "dist"; +if (!existsSync(outdir)) { + mkdirSync(outdir, { recursive: true }); +} + +const result = await Bun.build({ + entrypoints: ["src/main.ts"], + target: "node", + outdir, + naming: "pty-daemon.js", + format: "esm", + external: ["node-pty"], +}); + +if (!result.success) { + console.error("[pty-daemon] build failed:"); + for (const log of result.logs) { + console.error(log); + } + process.exit(1); +} diff --git a/packages/pty-daemon/package.json b/packages/pty-daemon/package.json new file mode 100644 index 00000000000..695a085aadb --- /dev/null +++ b/packages/pty-daemon/package.json @@ -0,0 +1,39 @@ +{ + "name": "@superset/pty-daemon", + "version": "0.1.0", + "private": true, + "type": "module", + "exports": { + ".": { + "types": "./src/index.ts", + "default": "./src/index.ts" + }, + "./protocol": { + "types": "./src/protocol/index.ts", + "default": "./src/protocol/index.ts" + } + }, + "bin": { + "pty-daemon": "./src/main.ts" + }, + "engines": { + "node": ">=20" + }, + "scripts": { + "clean": "git clean -xdf .cache .turbo dist node_modules", + "start": "node --experimental-strip-types src/main.ts", + "build:daemon": "bun run build.ts", + "typecheck": "tsc --noEmit --emitDeclarationOnly false", + "test": "bun test src/protocol src/SessionStore src/handlers src/Pty/Pty.test.ts", + "test:integration": "node --experimental-strip-types --test test/integration.test.ts test/control-plane.test.ts" + }, + "dependencies": { + "node-pty": "1.1.0" + }, + "devDependencies": { + "@superset/typescript": "workspace:*", + "@types/node": "^24.9.1", + "bun-types": "^1.3.1", + "typescript": "^5.9.3" + } +} diff --git a/packages/pty-daemon/src/Pty/Pty.test.ts b/packages/pty-daemon/src/Pty/Pty.test.ts new file mode 100644 index 00000000000..0cbad51032b --- /dev/null +++ b/packages/pty-daemon/src/Pty/Pty.test.ts @@ -0,0 +1,34 @@ +import { describe, expect, test } from "bun:test"; +import { spawn } from "./Pty.ts"; + +// node-pty's runtime requires Node (Bun's tty.ReadStream handling is +// incompatible with the master fd setup). The daemon ships running under +// node; integration spawn tests live in test/integration.ts and run via +// `npm run test:integration`. Here we only cover the synchronous validation +// logic that doesn't require spawning a real PTY. + +describe("Pty wrapper (validation only — spawn behavior tested under node)", () => { + test("rejects invalid spawn dims (cols)", () => { + expect(() => + spawn({ + meta: { shell: "/bin/sh", argv: [], cols: 0, rows: 24 }, + }), + ).toThrow(/invalid cols/); + }); + + test("rejects invalid spawn dims (rows)", () => { + expect(() => + spawn({ + meta: { shell: "/bin/sh", argv: [], cols: 80, rows: 0 }, + }), + ).toThrow(/invalid rows/); + }); + + test("rejects non-integer dims", () => { + expect(() => + spawn({ + meta: { shell: "/bin/sh", argv: [], cols: 80.5, rows: 24 }, + }), + ).toThrow(/invalid cols/); + }); +}); diff --git a/packages/pty-daemon/src/Pty/Pty.ts b/packages/pty-daemon/src/Pty/Pty.ts new file mode 100644 index 00000000000..608822ac047 --- /dev/null +++ b/packages/pty-daemon/src/Pty/Pty.ts @@ -0,0 +1,84 @@ +import * as nodePty from "node-pty"; +import type { SessionMeta } from "../protocol/index.ts"; + +export type PtyOnData = (data: Buffer) => void; +export type PtyOnExit = (info: { + code: number | null; + signal: number | null; +}) => void; + +export interface Pty { + readonly pid: number; + readonly meta: SessionMeta; + write(data: Buffer): void; + resize(cols: number, rows: number): void; + kill(signal?: NodeJS.Signals): void; + onData(cb: PtyOnData): void; + onExit(cb: PtyOnExit): void; +} + +export interface SpawnOptions { + meta: SessionMeta; +} + +class NodePtyAdapter implements Pty { + readonly pid: number; + meta: SessionMeta; + private term: nodePty.IPty; + + constructor(term: nodePty.IPty, meta: SessionMeta) { + this.term = term; + this.pid = term.pid; + this.meta = meta; + } + + write(data: Buffer): void { + // node-pty's write accepts strings or buffers; pass buffer to keep bytes intact. + this.term.write(data as unknown as string); + } + + resize(cols: number, rows: number): void { + validateDims(cols, rows); + this.term.resize(cols, rows); + this.meta = { ...this.meta, cols, rows }; + } + + kill(signal?: NodeJS.Signals): void { + this.term.kill(signal); + } + + onData(cb: PtyOnData): void { + this.term.onData((d) => { + cb(typeof d === "string" ? Buffer.from(d, "utf8") : d); + }); + } + + onExit(cb: PtyOnExit): void { + this.term.onExit(({ exitCode, signal }) => { + cb({ code: exitCode ?? null, signal: signal ?? null }); + }); + } +} + +function validateDims(cols: number, rows: number): void { + if (!Number.isInteger(cols) || cols <= 0) { + throw new Error(`invalid cols: ${cols}`); + } + if (!Number.isInteger(rows) || rows <= 0) { + throw new Error(`invalid rows: ${rows}`); + } +} + +export function spawn({ meta }: SpawnOptions): Pty { + validateDims(meta.cols, meta.rows); + const term = nodePty.spawn(meta.shell, meta.argv, { + name: "xterm-256color", + cols: meta.cols, + rows: meta.rows, + cwd: meta.cwd, + env: meta.env, + // node-pty's encoding defaults to utf8; we want raw bytes for fidelity. + encoding: null, + }); + return new NodePtyAdapter(term, meta); +} diff --git a/packages/pty-daemon/src/Pty/index.ts b/packages/pty-daemon/src/Pty/index.ts new file mode 100644 index 00000000000..8ce7d556c01 --- /dev/null +++ b/packages/pty-daemon/src/Pty/index.ts @@ -0,0 +1,2 @@ +export type { Pty, PtyOnData, PtyOnExit, SpawnOptions } from "./Pty.ts"; +export { spawn } from "./Pty.ts"; diff --git a/packages/pty-daemon/src/Server/Server.ts b/packages/pty-daemon/src/Server/Server.ts new file mode 100644 index 00000000000..3395c54502e --- /dev/null +++ b/packages/pty-daemon/src/Server/Server.ts @@ -0,0 +1,257 @@ +import * as fs from "node:fs"; +import * as net from "node:net"; +import * as path from "node:path"; +import type { Conn, HandlerCtx } from "../handlers/index.ts"; +import { + handleClose, + handleInput, + handleList, + handleOpen, + handleResize, + handleSubscribe, + handleUnsubscribe, +} from "../handlers/index.ts"; +import { + type ClientMessage, + CURRENT_PROTOCOL_VERSION, + encodeFrame, + FrameDecoder, + type HelloMessage, + type ServerMessage, + SUPPORTED_PROTOCOL_VERSIONS, +} from "../protocol/index.ts"; +import type { Session } from "../SessionStore/index.ts"; +import { SessionStore } from "../SessionStore/index.ts"; + +export interface ServerOptions { + socketPath: string; + daemonVersion: string; + bufferCap?: number; +} + +interface ConnState extends Conn { + socket: net.Socket; + decoder: FrameDecoder; + negotiated: number | null; +} + +export class Server { + private readonly server: net.Server; + private readonly store: SessionStore; + private readonly conns = new Set(); + private readonly opts: ServerOptions; + + constructor(opts: ServerOptions) { + this.opts = opts; + this.store = new SessionStore({ bufferCap: opts.bufferCap }); + this.server = net.createServer((socket) => this.onConnection(socket)); + } + + async listen(): Promise { + const dir = path.dirname(this.opts.socketPath); + fs.mkdirSync(dir, { recursive: true }); + // Stale-socket cleanup: remove any prior socket file at this path. + try { + fs.unlinkSync(this.opts.socketPath); + } catch (err) { + if ((err as NodeJS.ErrnoException).code !== "ENOENT") throw err; + } + await new Promise((resolve, reject) => { + this.server.once("error", reject); + this.server.listen(this.opts.socketPath, () => { + this.server.off("error", reject); + resolve(); + }); + }); + // Owner-only access. The socket file IS the auth boundary. + fs.chmodSync(this.opts.socketPath, 0o600); + } + + async close(): Promise { + for (const c of this.conns) c.socket.destroy(); + this.conns.clear(); + // Kill all owned PTYs so the daemon process can actually exit (open + // master fds keep the event loop alive). This is what the v1 lessons + // call "synchronous teardown only" — no setTimeout, no graceful drain. + for (const session of this.store.all()) { + try { + session.pty.kill("SIGKILL"); + } catch { + // already dead, ignore + } + } + await new Promise((resolve) => this.server.close(() => resolve())); + try { + fs.unlinkSync(this.opts.socketPath); + } catch { + // ignore + } + } + + private onConnection(socket: net.Socket): void { + const conn: ConnState = { + socket, + decoder: new FrameDecoder(), + negotiated: null, + subscriptions: new Set(), + send: (msg) => writeMessage(socket, msg), + }; + this.conns.add(conn); + + socket.on("data", (chunk) => { + try { + conn.decoder.push(chunk); + for (const raw of conn.decoder.drain()) { + this.dispatch(conn, raw as ClientMessage); + } + } catch (err) { + conn.send({ + type: "error", + message: (err as Error).message, + code: "EPROTO", + }); + socket.destroy(); + } + }); + socket.on("close", () => { + this.conns.delete(conn); + }); + socket.on("error", () => { + this.conns.delete(conn); + }); + } + + private dispatch(conn: ConnState, msg: ClientMessage): void { + // Handshake must come first. + if (conn.negotiated === null) { + if (msg.type !== "hello") { + conn.send({ type: "error", message: "expected hello", code: "EPROTO" }); + conn.socket.destroy(); + return; + } + const negotiated = pickProtocol(msg); + if (negotiated === null) { + conn.send({ + type: "error", + message: `no compatible protocol; daemon supports ${SUPPORTED_PROTOCOL_VERSIONS.join(",")}`, + code: "EVERSION", + }); + conn.socket.destroy(); + return; + } + conn.negotiated = negotiated; + conn.send({ + type: "hello-ack", + protocol: negotiated, + daemonVersion: this.opts.daemonVersion, + }); + return; + } + + const ctx = this.handlerCtx(); + switch (msg.type) { + case "hello": { + conn.send({ + type: "error", + message: "duplicate hello", + code: "EPROTO", + }); + return; + } + case "open": { + conn.send(handleOpen(ctx, msg)); + return; + } + case "input": { + const reply = handleInput(ctx, msg); + if (reply) conn.send(reply); + return; + } + case "resize": { + const reply = handleResize(ctx, msg); + if (reply) conn.send(reply); + return; + } + case "close": { + conn.send(handleClose(ctx, msg)); + return; + } + case "list": { + conn.send(handleList(ctx)); + return; + } + case "subscribe": { + handleSubscribe(ctx, conn, msg); + return; + } + case "unsubscribe": { + handleUnsubscribe(conn, msg); + return; + } + default: { + const t = (msg as { type: string }).type; + conn.send({ + type: "error", + message: `unknown op: ${t}`, + code: "EPROTO", + }); + return; + } + } + } + + private handlerCtx(): HandlerCtx { + return { + store: this.store, + wireSession: (session) => this.wireSession(session), + }; + } + + /** + * Pipe the session's PTY events into the broadcast set: any connection + * subscribed to this session id receives the output / exit frames. + */ + private wireSession(session: Session): void { + session.pty.onData((chunk) => { + this.store.appendOutput(session, chunk); + const out: ServerMessage = { + type: "output", + id: session.id, + data: chunk.toString("base64"), + }; + for (const c of this.conns) { + if (c.subscriptions.has(session.id)) c.send(out); + } + }); + session.pty.onExit((info) => { + session.exited = true; + session.exitCode = info.code; + session.exitSignal = info.signal; + const ev: ServerMessage = { + type: "exit", + id: session.id, + code: info.code, + signal: info.signal, + }; + for (const c of this.conns) { + if (c.subscriptions.has(session.id)) c.send(ev); + } + // Keep the session row around briefly so a late subscriber can still + // fetch the buffer; we delete on next list/close. + }); + } +} + +function pickProtocol(hello: HelloMessage): number | null { + const supported = new Set(SUPPORTED_PROTOCOL_VERSIONS); + let best: number | null = null; + for (const v of hello.protocols) { + if (supported.has(v) && (best === null || v > best)) best = v; + } + return best ?? (supported.has(CURRENT_PROTOCOL_VERSION) ? null : null); +} + +function writeMessage(socket: net.Socket, msg: ServerMessage): void { + if (socket.destroyed) return; + socket.write(encodeFrame(msg)); +} diff --git a/packages/pty-daemon/src/Server/index.ts b/packages/pty-daemon/src/Server/index.ts new file mode 100644 index 00000000000..0126739712f --- /dev/null +++ b/packages/pty-daemon/src/Server/index.ts @@ -0,0 +1 @@ +export { Server, type ServerOptions } from "./Server.ts"; diff --git a/packages/pty-daemon/src/SessionStore/SessionStore.test.ts b/packages/pty-daemon/src/SessionStore/SessionStore.test.ts new file mode 100644 index 00000000000..19d6da4c989 --- /dev/null +++ b/packages/pty-daemon/src/SessionStore/SessionStore.test.ts @@ -0,0 +1,82 @@ +import { describe, expect, test } from "bun:test"; +import type { Pty } from "../Pty/index.ts"; +import { SessionStore } from "./SessionStore.ts"; + +function fakePty(meta: { cols: number; rows: number }): Pty { + return { + pid: 12345, + meta: { + shell: "/bin/sh", + argv: [], + cols: meta.cols, + rows: meta.rows, + }, + write: () => {}, + resize: () => {}, + kill: () => {}, + onData: () => {}, + onExit: () => {}, + }; +} + +describe("SessionStore", () => { + test("add / get / delete", () => { + const store = new SessionStore(); + const pty = fakePty({ cols: 80, rows: 24 }); + store.add("s0", pty); + expect(store.size()).toBe(1); + expect(store.get("s0")?.id).toBe("s0"); + expect(store.delete("s0")).toBe(true); + expect(store.size()).toBe(0); + }); + + test("rejects duplicate ids", () => { + const store = new SessionStore(); + const pty = fakePty({ cols: 80, rows: 24 }); + store.add("s0", pty); + expect(() => store.add("s0", pty)).toThrow(/already exists/); + }); + + test("list reflects sessions", () => { + const store = new SessionStore(); + store.add("a", fakePty({ cols: 80, rows: 24 })); + store.add("b", fakePty({ cols: 100, rows: 30 })); + const list = store.list(); + expect(list).toHaveLength(2); + expect(list.map((s) => s.id).sort()).toEqual(["a", "b"]); + }); + + test("appendOutput accumulates within cap", () => { + const store = new SessionStore({ bufferCap: 100 }); + const session = store.add("s0", fakePty({ cols: 80, rows: 24 })); + store.appendOutput(session, Buffer.from("hello")); + store.appendOutput(session, Buffer.from(" world")); + expect(store.snapshotBuffer(session).toString()).toBe("hello world"); + expect(session.bufferBytes).toBe(11); + }); + + test("appendOutput evicts oldest chunks when exceeding cap", () => { + const store = new SessionStore({ bufferCap: 10 }); + const session = store.add("s0", fakePty({ cols: 80, rows: 24 })); + store.appendOutput(session, Buffer.from("AAAA")); // 4 + store.appendOutput(session, Buffer.from("BBBB")); // 8 + store.appendOutput(session, Buffer.from("CCCCCC")); // would be 14 → evict AAAA + const snap = store.snapshotBuffer(session).toString(); + expect(snap).toBe("BBBBCCCCCC"); + expect(session.bufferBytes).toBe(10); + }); + + test("appendOutput keeps buffer within cap across many writes", () => { + const store = new SessionStore({ bufferCap: 32 }); + const session = store.add("s0", fakePty({ cols: 80, rows: 24 })); + for (let i = 0; i < 100; i++) { + store.appendOutput( + session, + Buffer.from(`chunk${i.toString().padStart(2, "0")}-`), + ); + } + expect(session.bufferBytes).toBeLessThanOrEqual(32); + // Final chunk must always be present + expect(store.snapshotBuffer(session).toString()).toContain("chunk99-"); + }); +}); diff --git a/packages/pty-daemon/src/SessionStore/SessionStore.ts b/packages/pty-daemon/src/SessionStore/SessionStore.ts new file mode 100644 index 00000000000..40f69d96979 --- /dev/null +++ b/packages/pty-daemon/src/SessionStore/SessionStore.ts @@ -0,0 +1,104 @@ +import type { Pty } from "../Pty/index.ts"; +import type { SessionInfo } from "../protocol/index.ts"; + +const DEFAULT_BUFFER_BYTES = 64 * 1024; + +export interface Session { + id: string; + pty: Pty; + /** ring buffer for replay-on-attach; in-memory only, never persisted. */ + buffer: Buffer[]; + bufferBytes: number; + bufferCap: number; + exited: boolean; + exitCode: number | null; + exitSignal: number | null; +} + +export interface SessionStoreOptions { + bufferCap?: number; +} + +/** + * In-memory map of active sessions. Daemon-local state; nothing is persisted. + * + * Replay buffer is a circular FIFO of byte chunks per session, capped by + * total byte size. When new output exceeds the cap, oldest chunks are + * dropped (head). The cap is small (~64 KB) — enough to redraw a typical + * shell screen on attach. Larger scrollback is the renderer's xterm.js + * responsibility. + */ +export class SessionStore { + private readonly sessions = new Map(); + private readonly bufferCap: number; + + constructor(opts: SessionStoreOptions = {}) { + this.bufferCap = opts.bufferCap ?? DEFAULT_BUFFER_BYTES; + } + + add(id: string, pty: Pty): Session { + if (this.sessions.has(id)) { + throw new Error(`session already exists: ${id}`); + } + const session: Session = { + id, + pty, + buffer: [], + bufferBytes: 0, + bufferCap: this.bufferCap, + exited: false, + exitCode: null, + exitSignal: null, + }; + this.sessions.set(id, session); + return session; + } + + get(id: string): Session | undefined { + return this.sessions.get(id); + } + + delete(id: string): boolean { + return this.sessions.delete(id); + } + + list(): SessionInfo[] { + const out: SessionInfo[] = []; + for (const s of this.sessions.values()) { + out.push({ + id: s.id, + pid: s.pty.pid, + cols: s.pty.meta.cols, + rows: s.pty.meta.rows, + alive: !s.exited, + }); + } + return out; + } + + all(): IterableIterator { + return this.sessions.values(); + } + + size(): number { + return this.sessions.size; + } + + /** Append output to a session's ring buffer; evict oldest chunks past the cap. */ + appendOutput(session: Session, chunk: Buffer): void { + session.buffer.push(chunk); + session.bufferBytes += chunk.byteLength; + while ( + session.bufferBytes > session.bufferCap && + session.buffer.length > 0 + ) { + const head = session.buffer.shift(); + if (head) session.bufferBytes -= head.byteLength; + } + } + + /** Snapshot the buffered bytes for replay; doesn't clear the buffer. */ + snapshotBuffer(session: Session): Buffer { + return Buffer.concat(session.buffer); + } +} diff --git a/packages/pty-daemon/src/SessionStore/index.ts b/packages/pty-daemon/src/SessionStore/index.ts new file mode 100644 index 00000000000..0e7e0400010 --- /dev/null +++ b/packages/pty-daemon/src/SessionStore/index.ts @@ -0,0 +1,2 @@ +export type { Session, SessionStoreOptions } from "./SessionStore.ts"; +export { SessionStore } from "./SessionStore.ts"; diff --git a/packages/pty-daemon/src/handlers/handlers.test.ts b/packages/pty-daemon/src/handlers/handlers.test.ts new file mode 100644 index 00000000000..c0c97975c1d --- /dev/null +++ b/packages/pty-daemon/src/handlers/handlers.test.ts @@ -0,0 +1,216 @@ +import { beforeEach, describe, expect, test } from "bun:test"; +import type { Pty, SpawnOptions } from "../Pty/index.ts"; +import type { ServerMessage } from "../protocol/index.ts"; +import { SessionStore } from "../SessionStore/index.ts"; +import type { Conn, HandlerCtx } from "./handlers.ts"; +import { + handleClose, + handleInput, + handleList, + handleOpen, + handleResize, + handleSubscribe, + handleUnsubscribe, +} from "./handlers.ts"; + +interface FakePtyState { + pid: number; + cols: number; + rows: number; + written: Buffer[]; + killed: boolean; +} + +function makeFakePty(state: FakePtyState, meta: SpawnOptions["meta"]): Pty { + state.cols = meta.cols; + state.rows = meta.rows; + return { + pid: state.pid, + meta, + write: (b) => state.written.push(b), + resize: (c, r) => { + state.cols = c; + state.rows = r; + }, + kill: () => { + state.killed = true; + }, + onData: () => {}, + onExit: () => {}, + }; +} + +function makeConn(): Conn & { sent: ServerMessage[] } { + const sent: ServerMessage[] = []; + return { + sent, + subscriptions: new Set(), + send: (m) => sent.push(m), + }; +} + +let nextPid = 1000; +let states: FakePtyState[] = []; +let wired: Pty[] = []; + +function makeCtx(): HandlerCtx & { + spawnedStates: FakePtyState[]; + wired: Pty[]; +} { + const store = new SessionStore(); + return { + store, + spawnedStates: states, + wired, + wireSession: (s) => { + wired.push(s.pty); + }, + spawnPty: (opts) => { + const state: FakePtyState = { + pid: nextPid++, + cols: opts.meta.cols, + rows: opts.meta.rows, + written: [], + killed: false, + }; + states.push(state); + return makeFakePty(state, opts.meta); + }, + }; +} + +beforeEach(() => { + nextPid = 1000; + states = []; + wired = []; +}); + +describe("handlers", () => { + test("open: spawns a session and replies open-ok", () => { + const ctx = makeCtx(); + const reply = handleOpen(ctx, { + type: "open", + id: "s0", + meta: { shell: "/bin/sh", argv: [], cols: 80, rows: 24 }, + }); + expect(reply.type).toBe("open-ok"); + if (reply.type === "open-ok") expect(reply.pid).toBe(1000); + expect(ctx.store.size()).toBe(1); + expect(ctx.wired).toHaveLength(1); + }); + + test("open: rejects duplicate ids", () => { + const ctx = makeCtx(); + const meta = { shell: "/bin/sh", argv: [], cols: 80, rows: 24 }; + handleOpen(ctx, { type: "open", id: "s0", meta }); + const reply = handleOpen(ctx, { type: "open", id: "s0", meta }); + expect(reply.type).toBe("error"); + }); + + test("input writes bytes to the pty", () => { + const ctx = makeCtx(); + handleOpen(ctx, { + type: "open", + id: "s0", + meta: { shell: "/bin/sh", argv: [], cols: 80, rows: 24 }, + }); + const result = handleInput(ctx, { + type: "input", + id: "s0", + data: Buffer.from("hello").toString("base64"), + }); + expect(result).toBeUndefined(); + expect(states[0]?.written.map((b) => b.toString())).toEqual(["hello"]); + }); + + test("input on missing session returns error", () => { + const ctx = makeCtx(); + const result = handleInput(ctx, { + type: "input", + id: "missing", + data: "", + }); + expect(result?.type).toBe("error"); + }); + + test("resize updates dims", () => { + const ctx = makeCtx(); + handleOpen(ctx, { + type: "open", + id: "s0", + meta: { shell: "/bin/sh", argv: [], cols: 80, rows: 24 }, + }); + expect( + handleResize(ctx, { type: "resize", id: "s0", cols: 100, rows: 30 }), + ).toBeUndefined(); + expect(states[0]?.cols).toBe(100); + expect(states[0]?.rows).toBe(30); + }); + + test("close kills the pty and replies closed", () => { + const ctx = makeCtx(); + handleOpen(ctx, { + type: "open", + id: "s0", + meta: { shell: "/bin/sh", argv: [], cols: 80, rows: 24 }, + }); + const reply = handleClose(ctx, { type: "close", id: "s0" }); + expect(reply.type).toBe("closed"); + expect(states[0]?.killed).toBe(true); + }); + + test("list returns all sessions", () => { + const ctx = makeCtx(); + const meta = { shell: "/bin/sh", argv: [], cols: 80, rows: 24 }; + handleOpen(ctx, { type: "open", id: "a", meta }); + handleOpen(ctx, { type: "open", id: "b", meta }); + const reply = handleList(ctx); + expect(reply.sessions).toHaveLength(2); + }); + + test("subscribe with replay sends buffered output", () => { + const ctx = makeCtx(); + handleOpen(ctx, { + type: "open", + id: "s0", + meta: { shell: "/bin/sh", argv: [], cols: 80, rows: 24 }, + }); + const session = ctx.store.get("s0"); + if (!session) throw new Error("no session"); + ctx.store.appendOutput(session, Buffer.from("prior bytes")); + + const conn = makeConn(); + handleSubscribe(ctx, conn, { type: "subscribe", id: "s0", replay: true }); + expect(conn.subscriptions.has("s0")).toBe(true); + expect(conn.sent).toHaveLength(1); + const m = conn.sent[0]; + expect(m?.type).toBe("output"); + if (m?.type === "output") { + expect(Buffer.from(m.data, "base64").toString()).toBe("prior bytes"); + } + }); + + test("subscribe without replay does not send buffered output", () => { + const ctx = makeCtx(); + handleOpen(ctx, { + type: "open", + id: "s0", + meta: { shell: "/bin/sh", argv: [], cols: 80, rows: 24 }, + }); + const session = ctx.store.get("s0"); + if (!session) throw new Error("no session"); + ctx.store.appendOutput(session, Buffer.from("prior bytes")); + + const conn = makeConn(); + handleSubscribe(ctx, conn, { type: "subscribe", id: "s0", replay: false }); + expect(conn.subscriptions.has("s0")).toBe(true); + expect(conn.sent).toHaveLength(0); + }); + + test("unsubscribe removes from conn.subscriptions", () => { + const conn = makeConn(); + conn.subscriptions.add("s0"); + handleUnsubscribe(conn, { type: "unsubscribe", id: "s0" }); + expect(conn.subscriptions.has("s0")).toBe(false); + }); +}); diff --git a/packages/pty-daemon/src/handlers/handlers.ts b/packages/pty-daemon/src/handlers/handlers.ts new file mode 100644 index 00000000000..c752982493e --- /dev/null +++ b/packages/pty-daemon/src/handlers/handlers.ts @@ -0,0 +1,148 @@ +import { + spawn as defaultSpawn, + type Pty, + type SpawnOptions, +} from "../Pty/index.ts"; +import type { + CloseMessage, + InputMessage, + ListReplyMessage, + OpenMessage, + OpenOkMessage, + OutputMessage, + ResizeMessage, + ServerMessage, + SubscribeMessage, + UnsubscribeMessage, +} from "../protocol/index.ts"; +import type { Session, SessionStore } from "../SessionStore/index.ts"; + +/** + * Per-connection state owned by the Server. Handlers receive a Conn ref to + * read/write subscription membership and to send messages. + */ +export interface Conn { + subscriptions: Set; + send(message: ServerMessage): void; +} + +/** + * Wire a freshly-created session's PTY events into the broadcast pipeline. + * Called once at session-open time. The Server owns the broadcast set. + */ +export type SessionWirer = (session: Session) => void; + +export interface HandlerCtx { + store: SessionStore; + wireSession: SessionWirer; + /** Pluggable spawn for testability; defaults to real node-pty in production. */ + spawnPty?: (opts: SpawnOptions) => Pty; +} + +export function handleOpen(ctx: HandlerCtx, msg: OpenMessage): ServerMessage { + if (ctx.store.get(msg.id)) { + return errorFor(msg.id, `session already exists: ${msg.id}`, "EEXIST"); + } + let session: Session; + const spawnFn = ctx.spawnPty ?? defaultSpawn; + try { + const pty = spawnFn({ meta: msg.meta }); + session = ctx.store.add(msg.id, pty); + } catch (err) { + return errorFor(msg.id, (err as Error).message, "ESPAWN"); + } + ctx.wireSession(session); + const reply: OpenOkMessage = { + type: "open-ok", + id: msg.id, + pid: session.pty.pid, + }; + return reply; +} + +export function handleInput( + ctx: HandlerCtx, + msg: InputMessage, +): ServerMessage | undefined { + const session = ctx.store.get(msg.id); + if (!session) return errorFor(msg.id, `unknown session: ${msg.id}`, "ENOENT"); + if (session.exited) + return errorFor(msg.id, `session exited: ${msg.id}`, "EEXITED"); + try { + session.pty.write(Buffer.from(msg.data, "base64")); + } catch (err) { + return errorFor(msg.id, (err as Error).message, "EWRITE"); + } + return undefined; +} + +export function handleResize( + ctx: HandlerCtx, + msg: ResizeMessage, +): ServerMessage | undefined { + const session = ctx.store.get(msg.id); + if (!session) return errorFor(msg.id, `unknown session: ${msg.id}`, "ENOENT"); + try { + session.pty.resize(msg.cols, msg.rows); + } catch (err) { + return errorFor(msg.id, (err as Error).message, "ERESIZE"); + } + return undefined; +} + +export function handleClose(ctx: HandlerCtx, msg: CloseMessage): ServerMessage { + const session = ctx.store.get(msg.id); + if (!session) return errorFor(msg.id, `unknown session: ${msg.id}`, "ENOENT"); + try { + session.pty.kill(msg.signal ?? "SIGTERM"); + } catch (err) { + return errorFor(msg.id, (err as Error).message, "EKILL"); + } + return { type: "closed", id: msg.id }; +} + +export function handleList(ctx: HandlerCtx): ListReplyMessage { + return { type: "list-reply", sessions: ctx.store.list() }; +} + +/** + * Subscribe the connection to a session. If `replay` is true, immediately + * send an `output` frame containing the buffered bytes before live streaming + * begins. Live streaming is the Server's job once `subscriptions` includes + * this session id. + */ +export function handleSubscribe( + ctx: HandlerCtx, + conn: Conn, + msg: SubscribeMessage, +): void { + const session = ctx.store.get(msg.id); + if (!session) { + conn.send(errorFor(msg.id, `unknown session: ${msg.id}`, "ENOENT")); + return; + } + conn.subscriptions.add(msg.id); + if (msg.replay) { + const snap = ctx.store.snapshotBuffer(session); + if (snap.byteLength > 0) { + const out: OutputMessage = { + type: "output", + id: msg.id, + data: snap.toString("base64"), + }; + conn.send(out); + } + } +} + +export function handleUnsubscribe(conn: Conn, msg: UnsubscribeMessage): void { + conn.subscriptions.delete(msg.id); +} + +function errorFor( + id: string | undefined, + message: string, + code?: string, +): ServerMessage { + return { type: "error", id, message, code }; +} diff --git a/packages/pty-daemon/src/handlers/index.ts b/packages/pty-daemon/src/handlers/index.ts new file mode 100644 index 00000000000..97b7935d62a --- /dev/null +++ b/packages/pty-daemon/src/handlers/index.ts @@ -0,0 +1,10 @@ +export type { Conn, HandlerCtx, SessionWirer } from "./handlers.ts"; +export { + handleClose, + handleInput, + handleList, + handleOpen, + handleResize, + handleSubscribe, + handleUnsubscribe, +} from "./handlers.ts"; diff --git a/packages/pty-daemon/src/index.ts b/packages/pty-daemon/src/index.ts new file mode 100644 index 00000000000..ec0b81b0324 --- /dev/null +++ b/packages/pty-daemon/src/index.ts @@ -0,0 +1,7 @@ +// Public package surface — host-service imports from "@superset/pty-daemon" or +// "@superset/pty-daemon/protocol". Daemon implementation runtime is Node; +// host-service is a CLIENT of the daemon (importing protocol types only), +// not a runtime peer. + +export { Server, type ServerOptions } from "./Server/index.ts"; +export type { Session } from "./SessionStore/index.ts"; diff --git a/packages/pty-daemon/src/main.ts b/packages/pty-daemon/src/main.ts new file mode 100644 index 00000000000..d40bd03fe1e --- /dev/null +++ b/packages/pty-daemon/src/main.ts @@ -0,0 +1,77 @@ +#!/usr/bin/env node +// pty-daemon entrypoint. Runs under Node (node-pty + Bun's tty.ReadStream +// don't get along; see the design doc). +// +// Usage: +// pty-daemon --socket=/path/to/sock [--buffer-bytes=65536] +// +// Logs go to stderr; nothing on stdout. + +import * as fs from "node:fs"; +import * as os from "node:os"; +import * as path from "node:path"; +import { fileURLToPath } from "node:url"; +import { Server } from "./Server/index.ts"; + +interface CliArgs { + socket: string; + bufferBytes?: number; +} + +function parseArgs(argv: string[]): CliArgs { + const args: Partial = {}; + for (const arg of argv) { + if (arg.startsWith("--socket=")) + args.socket = arg.slice("--socket=".length); + else if (arg.startsWith("--buffer-bytes=")) { + args.bufferBytes = Number.parseInt( + arg.slice("--buffer-bytes=".length), + 10, + ); + } + } + if (!args.socket) { + throw new Error("--socket=PATH is required"); + } + return args as CliArgs; +} + +async function main(): Promise { + const args = parseArgs(process.argv.slice(2)); + const daemonVersion = readPackageVersion(); + const server = new Server({ + socketPath: args.socket, + daemonVersion, + bufferCap: args.bufferBytes, + }); + await server.listen(); + process.stderr.write( + `[pty-daemon] listening on ${args.socket} (v${daemonVersion}, host=${os.hostname()})\n`, + ); + + const shutdown = async (signal: NodeJS.Signals) => { + process.stderr.write(`[pty-daemon] received ${signal}, shutting down\n`); + await server.close(); + process.exit(0); + }; + process.on("SIGINT", () => void shutdown("SIGINT")); + process.on("SIGTERM", () => void shutdown("SIGTERM")); +} + +function readPackageVersion(): string { + try { + const here = path.dirname(fileURLToPath(import.meta.url)); + const pkgPath = path.resolve(here, "..", "package.json"); + const pkg = JSON.parse(fs.readFileSync(pkgPath, "utf8")) as { + version?: string; + }; + return pkg.version ?? "0.0.0"; + } catch { + return "0.0.0"; + } +} + +main().catch((err) => { + process.stderr.write(`[pty-daemon] fatal: ${(err as Error).stack ?? err}\n`); + process.exit(1); +}); diff --git a/packages/pty-daemon/src/protocol/framing.test.ts b/packages/pty-daemon/src/protocol/framing.test.ts new file mode 100644 index 00000000000..0d3c947c766 --- /dev/null +++ b/packages/pty-daemon/src/protocol/framing.test.ts @@ -0,0 +1,49 @@ +import { describe, expect, test } from "bun:test"; +import { decodeFrame, encodeFrame, FrameDecoder } from "./framing.ts"; + +describe("framing", () => { + test("round-trips a simple object", () => { + const msg = { type: "hello", protocols: [1] }; + const frame = encodeFrame(msg); + expect(decodeFrame(frame)).toEqual(msg); + }); + + test("round-trips through FrameDecoder", () => { + const a = { type: "open", id: "s0" }; + const b = { type: "input", id: "s0", data: "aGk=" }; + const dec = new FrameDecoder(); + dec.push(Buffer.concat([encodeFrame(a), encodeFrame(b)])); + expect(dec.drain()).toEqual([a, b]); + }); + + test("FrameDecoder buffers across chunks", () => { + const msg = { type: "open", id: "s0" }; + const full = encodeFrame(msg); + const dec = new FrameDecoder(); + dec.push(full.subarray(0, 2)); + expect(dec.drain()).toEqual([]); + dec.push(full.subarray(2, 6)); + expect(dec.drain()).toEqual([]); + dec.push(full.subarray(6)); + expect(dec.drain()).toEqual([msg]); + }); + + test("FrameDecoder handles partial frame after a complete one", () => { + const a = { type: "open", id: "s0" }; + const b = { type: "open", id: "s1" }; + const buf = Buffer.concat([encodeFrame(a), encodeFrame(b)]); + const dec = new FrameDecoder(); + dec.push(buf.subarray(0, encodeFrame(a).length + 3)); + expect(dec.drain()).toEqual([a]); + dec.push(buf.subarray(encodeFrame(a).length + 3)); + expect(dec.drain()).toEqual([b]); + }); + + test("rejects oversized frames", () => { + const bigHeader = Buffer.alloc(4); + bigHeader.writeUInt32BE(20 * 1024 * 1024, 0); // 20 MB + const dec = new FrameDecoder(); + dec.push(bigHeader); + expect(() => dec.drain()).toThrow(/frame too large/); + }); +}); diff --git a/packages/pty-daemon/src/protocol/framing.ts b/packages/pty-daemon/src/protocol/framing.ts new file mode 100644 index 00000000000..df678a0b76d --- /dev/null +++ b/packages/pty-daemon/src/protocol/framing.ts @@ -0,0 +1,56 @@ +// Length-prefixed binary frames over a SOCK_STREAM socket. +// +// Wire: [u32 BE length][JSON UTF-8 payload of that length] + +const HEADER_BYTES = 4; +const MAX_FRAME_BYTES = 8 * 1024 * 1024; // 8 MB hard cap; abort the connection above this. + +export function encodeFrame(message: unknown): Buffer { + const json = JSON.stringify(message); + const payload = Buffer.from(json, "utf8"); + const header = Buffer.alloc(HEADER_BYTES); + header.writeUInt32BE(payload.byteLength, 0); + return Buffer.concat([header, payload]); +} + +/** + * Streaming decoder. Feed bytes via `push`; iterate completed frames via `drain`. + * Throws on oversized frames so a malformed peer can't exhaust memory. + */ +export class FrameDecoder { + private buf: Buffer = Buffer.alloc(0); + + push(chunk: Buffer): void { + this.buf = this.buf.length === 0 ? chunk : Buffer.concat([this.buf, chunk]); + } + + drain(): unknown[] { + const out: unknown[] = []; + while (this.buf.length >= HEADER_BYTES) { + const len = this.buf.readUInt32BE(0); + if (len > MAX_FRAME_BYTES) { + throw new Error(`frame too large: ${len} bytes`); + } + if (this.buf.length < HEADER_BYTES + len) break; + const payload = this.buf.subarray(HEADER_BYTES, HEADER_BYTES + len); + out.push(JSON.parse(payload.toString("utf8"))); + this.buf = this.buf.subarray(HEADER_BYTES + len); + } + return out; + } +} + +/** + * One-shot decode of a buffer that contains exactly one complete frame. + * Used by tests; production reads use FrameDecoder. + */ +export function decodeFrame(buf: Buffer): unknown { + if (buf.length < HEADER_BYTES) throw new Error("short frame"); + const len = buf.readUInt32BE(0); + if (buf.length !== HEADER_BYTES + len) { + throw new Error( + `frame length mismatch: header=${len} buf=${buf.length - HEADER_BYTES}`, + ); + } + return JSON.parse(buf.subarray(HEADER_BYTES).toString("utf8")); +} diff --git a/packages/pty-daemon/src/protocol/index.ts b/packages/pty-daemon/src/protocol/index.ts new file mode 100644 index 00000000000..dd5bf0788e9 --- /dev/null +++ b/packages/pty-daemon/src/protocol/index.ts @@ -0,0 +1,26 @@ +export { decodeFrame, encodeFrame, FrameDecoder } from "./framing.ts"; +export type { + ClientMessage, + ClosedMessage, + CloseMessage, + ErrorMessage, + ExitMessage, + HelloAckMessage, + HelloMessage, + InputMessage, + ListMessage, + ListReplyMessage, + OpenMessage, + OpenOkMessage, + OutputMessage, + ResizeMessage, + ServerMessage, + SessionInfo, + SessionMeta, + SubscribeMessage, + UnsubscribeMessage, +} from "./messages.ts"; +export { + CURRENT_PROTOCOL_VERSION, + SUPPORTED_PROTOCOL_VERSIONS, +} from "./version.ts"; diff --git a/packages/pty-daemon/src/protocol/messages.ts b/packages/pty-daemon/src/protocol/messages.ts new file mode 100644 index 00000000000..dbb26d5f7e9 --- /dev/null +++ b/packages/pty-daemon/src/protocol/messages.ts @@ -0,0 +1,140 @@ +// Message schemas for the pty-daemon Unix socket protocol. +// +// Wire format: 4-byte big-endian length prefix + UTF-8 JSON payload. +// Binary data (PTY input/output) travels base64-encoded inside the JSON. +// See ../README.md and ../../../../apps/desktop/plans/20260429-pty-daemon-implementation.md + +export interface SessionMeta { + shell: string; + argv: string[]; + cwd?: string; + env?: Record; + cols: number; + rows: number; +} + +export interface SessionInfo { + id: string; + pid: number; + cols: number; + rows: number; + alive: boolean; +} + +// ---------- Handshake ---------- + +export interface HelloMessage { + type: "hello"; + protocols: number[]; + clientVersion?: string; +} + +export interface HelloAckMessage { + type: "hello-ack"; + protocol: number; + daemonVersion: string; +} + +// ---------- Client -> Daemon ---------- + +export interface OpenMessage { + type: "open"; + id: string; + meta: SessionMeta; +} + +export interface InputMessage { + type: "input"; + id: string; + /** base64-encoded bytes */ + data: string; +} + +export interface ResizeMessage { + type: "resize"; + id: string; + cols: number; + rows: number; +} + +export interface CloseMessage { + type: "close"; + id: string; + signal?: "SIGINT" | "SIGTERM" | "SIGKILL" | "SIGHUP"; +} + +export interface ListMessage { + type: "list"; +} + +export interface SubscribeMessage { + type: "subscribe"; + id: string; + /** if true, replay buffered output before live streaming */ + replay: boolean; +} + +export interface UnsubscribeMessage { + type: "unsubscribe"; + id: string; +} + +// ---------- Daemon -> Client ---------- + +export interface OpenOkMessage { + type: "open-ok"; + id: string; + pid: number; +} + +export interface OutputMessage { + type: "output"; + id: string; + /** base64-encoded bytes */ + data: string; +} + +export interface ExitMessage { + type: "exit"; + id: string; + code: number | null; + signal: number | null; +} + +export interface ClosedMessage { + type: "closed"; + id: string; +} + +export interface ListReplyMessage { + type: "list-reply"; + sessions: SessionInfo[]; +} + +export interface ErrorMessage { + type: "error"; + id?: string; + message: string; + code?: string; +} + +// ---------- Unions ---------- + +export type ClientMessage = + | HelloMessage + | OpenMessage + | InputMessage + | ResizeMessage + | CloseMessage + | ListMessage + | SubscribeMessage + | UnsubscribeMessage; + +export type ServerMessage = + | HelloAckMessage + | OpenOkMessage + | OutputMessage + | ExitMessage + | ClosedMessage + | ListReplyMessage + | ErrorMessage; diff --git a/packages/pty-daemon/src/protocol/version.ts b/packages/pty-daemon/src/protocol/version.ts new file mode 100644 index 00000000000..350a807f332 --- /dev/null +++ b/packages/pty-daemon/src/protocol/version.ts @@ -0,0 +1,4 @@ +// Protocol versioning. Increment on breaking changes; add to SUPPORTED list +// while we still need to interop with the previous major during rollouts. +export const CURRENT_PROTOCOL_VERSION = 1 as const; +export const SUPPORTED_PROTOCOL_VERSIONS: readonly number[] = [1]; diff --git a/packages/pty-daemon/test/control-plane.test.ts b/packages/pty-daemon/test/control-plane.test.ts new file mode 100644 index 00000000000..659ecbdb55a --- /dev/null +++ b/packages/pty-daemon/test/control-plane.test.ts @@ -0,0 +1,734 @@ +// Comprehensive control-plane test for pty-daemon. Each test exercises a +// real daemon over a real Unix socket and walks through one usage pattern +// end-to-end. Together these cover every usage shape host-service can throw +// at the daemon: handshake variants, session lifecycle, I/O patterns, +// multi-client subscribe/replay/unsubscribe, detach+reattach, malformed +// input, late subscribers, concurrent N sessions, shutdown. +// +// Runs under Node (`node --experimental-strip-types --test`). + +import { strict as assert } from "node:assert"; +import * as os from "node:os"; +import * as path from "node:path"; +import { after, before, describe, test } from "node:test"; +import { encodeFrame } from "../src/protocol/index.ts"; +import { Server } from "../src/Server/index.ts"; +import { connect, connectAndHello } from "./helpers/client.ts"; + +const sockPath = path.join( + os.tmpdir(), + `pty-daemon-control-${process.pid}.sock`, +); +let server: Server; + +before(async () => { + server = new Server({ + socketPath: sockPath, + daemonVersion: "0.0.0-control", + bufferCap: 8 * 1024, + }); + await server.listen(); +}); + +after(async () => { + await server.close(); +}); + +const SH = "/bin/sh"; +const baseMeta = { + shell: SH, + argv: ["-c", "echo ready; sleep 5"] as string[], + cols: 80, + rows: 24, +}; + +function uniqueId(prefix: string): string { + return `${prefix}-${Math.random().toString(36).slice(2, 8)}`; +} + +// ---------------- Handshake ---------------- + +describe("handshake", () => { + test("rejects non-hello first message", async () => { + const c = await connect(sockPath); + c.send({ type: "list" }); + const err = await c.waitFor((m) => m.type === "error", 1000); + assert.equal(err.type, "error"); + await c.close(); + }); + + test("rejects unsupported protocol versions", async () => { + const c = await connect(sockPath); + c.send({ type: "hello", protocols: [99, 100] }); + const err = await c.waitFor((m) => m.type === "error", 1000); + if (err.type === "error") assert.equal(err.code, "EVERSION"); + await c.close(); + }); + + test("picks highest mutual when multiple offered", async () => { + const c = await connect(sockPath); + c.send({ type: "hello", protocols: [1, 99] }); + const ack = await c.waitFor((m) => m.type === "hello-ack"); + if (ack.type === "hello-ack") assert.equal(ack.protocol, 1); + await c.close(); + }); + + test("rejects duplicate hello", async () => { + const c = await connectAndHello(sockPath); + c.send({ type: "hello", protocols: [1] }); + const err = await c.waitFor((m) => m.type === "error", 1000); + if (err.type === "error") { + assert.match(err.message, /duplicate hello/); + } + await c.close(); + }); +}); + +// ---------------- Session lifecycle ---------------- + +describe("session lifecycle", () => { + test("rejects open with bad cols/rows", async () => { + const c = await connectAndHello(sockPath); + c.send({ + type: "open", + id: uniqueId("badspawn"), + meta: { ...baseMeta, cols: 0 }, + }); + const err = await c.waitFor((m) => m.type === "error", 1000); + if (err.type === "error") assert.equal(err.code, "ESPAWN"); + await c.close(); + }); + + test("rejects duplicate session id", async () => { + const c = await connectAndHello(sockPath); + const id = uniqueId("dup"); + c.send({ type: "open", id, meta: baseMeta }); + await c.waitFor((m) => m.type === "open-ok"); + c.send({ type: "open", id, meta: baseMeta }); + const err = await c.waitFor((m) => m.type === "error", 1000); + if (err.type === "error") assert.equal(err.code, "EEXIST"); + c.send({ type: "close", id }); + await c.close(); + }); + + test("input/resize/close on missing session return ENOENT", async () => { + const c = await connectAndHello(sockPath); + const missing = "missing-no-such"; + + c.send({ type: "input", id: missing, data: "" }); + const e1 = await c.waitFor((m) => m.type === "error", 1000); + if (e1.type === "error") assert.equal(e1.code, "ENOENT"); + + c.send({ type: "resize", id: missing, cols: 80, rows: 24 }); + const e2 = await c.waitFor((m) => m.type === "error" && m !== e1, 1000); + if (e2.type === "error") assert.equal(e2.code, "ENOENT"); + + c.send({ type: "close", id: missing }); + const e3 = await c.waitFor( + (m) => m.type === "error" && m !== e1 && m !== e2, + 1000, + ); + if (e3.type === "error") assert.equal(e3.code, "ENOENT"); + await c.close(); + }); + + test("instant-exit shell still produces an exit message", async () => { + const c = await connectAndHello(sockPath); + const id = uniqueId("instant"); + c.send({ + type: "open", + id, + meta: { ...baseMeta, argv: ["-c", "true"] }, + }); + await c.waitFor((m) => m.type === "open-ok" && m.id === id); + c.send({ type: "subscribe", id, replay: true }); + const exit = await c.waitFor((m) => m.type === "exit" && m.id === id, 3000); + if (exit.type === "exit") assert.equal(exit.code, 0); + await c.close(); + }); + + test("close with SIGKILL terminates a hung shell", async () => { + const c = await connectAndHello(sockPath); + const id = uniqueId("hung"); + c.send({ + type: "open", + id, + meta: { ...baseMeta, argv: ["-c", "sleep 60"] }, + }); + const ok = await c.waitFor((m) => m.type === "open-ok" && m.id === id); + if (ok.type !== "open-ok") throw new Error("no open-ok"); + + c.send({ type: "subscribe", id, replay: false }); + c.send({ type: "close", id, signal: "SIGKILL" }); + await c.waitFor((m) => m.type === "closed" && m.id === id); + await c.waitFor((m) => m.type === "exit" && m.id === id, 3000); + await c.close(); + }); +}); + +// ---------------- I/O patterns ---------------- + +describe("I/O patterns", () => { + test("resize during a running shell does not break stream", async () => { + const c = await connectAndHello(sockPath); + const id = uniqueId("resize"); + c.send({ + type: "open", + id, + meta: { ...baseMeta, argv: ["-i"] }, + }); + await c.waitFor((m) => m.type === "open-ok" && m.id === id); + c.send({ type: "subscribe", id, replay: false }); + + c.send({ type: "resize", id, cols: 120, rows: 40 }); + c.send({ + type: "input", + id, + data: Buffer.from("echo post-resize-marker\n").toString("base64"), + }); + await c.waitFor( + (m) => + m.type === "output" && + m.id === id && + Buffer.from(m.data, "base64").toString().includes("post-resize-marker"), + 3000, + ); + + c.send({ type: "close", id, signal: "SIGTERM" }); + await c.close(); + }); + + test("burst output (high-rate stdout) is delivered and ring-capped", async () => { + const c = await connectAndHello(sockPath); + const id = uniqueId("burst"); + c.send({ + type: "open", + id, + meta: { + ...baseMeta, + argv: [ + "-c", + "for i in $(seq 1 200); do echo BURST:$i; done; sleep 0.5", + ], + }, + }); + await c.waitFor((m) => m.type === "open-ok" && m.id === id); + c.send({ type: "subscribe", id, replay: false }); + + // Wait until we see the last marker, confirming live delivery. + await c.waitFor( + (m) => + m.type === "output" && + m.id === id && + Buffer.from(m.data, "base64").toString().includes("BURST:200"), + 5000, + ); + await c.waitFor((m) => m.type === "exit" && m.id === id, 5000); + await c.close(); + }); + + test("multi-byte UTF-8 output round-trips", async () => { + const c = await connectAndHello(sockPath); + const id = uniqueId("utf8"); + // 🚀 = 0xF0 0x9F 0x9A 0x80 + c.send({ + type: "open", + id, + meta: { + ...baseMeta, + argv: ["-c", "printf 'rocket: \\xf0\\x9f\\x9a\\x80\\n'; sleep 0.1"], + }, + }); + await c.waitFor((m) => m.type === "open-ok" && m.id === id); + c.send({ type: "subscribe", id, replay: true }); + await c.waitFor( + (m) => + m.type === "output" && + m.id === id && + Buffer.from(m.data, "base64").toString().includes("🚀"), + 3000, + ); + await c.waitFor((m) => m.type === "exit" && m.id === id, 3000); + await c.close(); + }); +}); + +// ---------------- Multi-client subscribe / fan-out ---------------- + +describe("multi-client fan-out", () => { + test("two subscribers both receive the same output", async () => { + const a = await connectAndHello(sockPath); + const b = await connectAndHello(sockPath); + const id = uniqueId("fanout"); + + a.send({ + type: "open", + id, + meta: { ...baseMeta, argv: ["-c", "echo fanout-marker; sleep 0.5"] }, + }); + await a.waitFor((m) => m.type === "open-ok" && m.id === id); + + a.send({ type: "subscribe", id, replay: false }); + b.send({ type: "subscribe", id, replay: false }); + + await Promise.all([ + a.waitFor( + (m) => + m.type === "output" && + m.id === id && + Buffer.from(m.data, "base64").toString().includes("fanout-marker"), + 3000, + ), + b.waitFor( + (m) => + m.type === "output" && + m.id === id && + Buffer.from(m.data, "base64").toString().includes("fanout-marker"), + 3000, + ), + ]); + + await Promise.all([a.close(), b.close()]); + }); + + test("unsubscribe stops further output to that connection", async () => { + const a = await connectAndHello(sockPath); + const b = await connectAndHello(sockPath); + const id = uniqueId("unsub"); + + a.send({ + type: "open", + id, + meta: { ...baseMeta, argv: ["-i"] }, + }); + await a.waitFor((m) => m.type === "open-ok" && m.id === id); + + a.send({ type: "subscribe", id, replay: false }); + b.send({ type: "subscribe", id, replay: false }); + + // First marker — both should see it. + a.send({ + type: "input", + id, + data: Buffer.from("echo first-marker\n").toString("base64"), + }); + await Promise.all([ + a.waitFor( + (m) => + m.type === "output" && + Buffer.from(m.data, "base64").toString().includes("first-marker"), + 3000, + ), + b.waitFor( + (m) => + m.type === "output" && + Buffer.from(m.data, "base64").toString().includes("first-marker"), + 3000, + ), + ]); + + // b unsubscribes; a is still subscribed. + b.send({ type: "unsubscribe", id }); + // Small settle so the unsubscribe lands before the next emit. + await new Promise((r) => setTimeout(r, 100)); + + const bAfterUnsub = b.collect( + (m) => m.type === "output" && m.id === id, + 500, + ); + + a.send({ + type: "input", + id, + data: Buffer.from("echo second-marker\n").toString("base64"), + }); + await a.waitFor( + (m) => + m.type === "output" && + Buffer.from(m.data, "base64").toString().includes("second-marker"), + 3000, + ); + + const bMessages = await bAfterUnsub; + const sawSecondOnB = bMessages.some( + (m) => + m.type === "output" && + Buffer.from(m.data, "base64").toString().includes("second-marker"), + ); + assert.equal(sawSecondOnB, false); + + a.send({ type: "close", id, signal: "SIGTERM" }); + await Promise.all([a.close(), b.close()]); + }); + + test("subscriber connection drop doesn't crash daemon; other clients keep streaming", async () => { + const owner = await connectAndHello(sockPath); + const dropper = await connectAndHello(sockPath); + const observer = await connectAndHello(sockPath); + const id = uniqueId("dropcrash"); + + owner.send({ + type: "open", + id, + meta: { ...baseMeta, argv: ["-i"] }, + }); + await owner.waitFor((m) => m.type === "open-ok" && m.id === id); + dropper.send({ type: "subscribe", id, replay: false }); + observer.send({ type: "subscribe", id, replay: false }); + + // Force-close the dropper without unsubscribing. + dropper.socket.destroy(); + + owner.send({ + type: "input", + id, + data: Buffer.from("echo survives-drop\n").toString("base64"), + }); + await observer.waitFor( + (m) => + m.type === "output" && + Buffer.from(m.data, "base64").toString().includes("survives-drop"), + 3000, + ); + + owner.send({ type: "close", id, signal: "SIGTERM" }); + await Promise.all([owner.close(), observer.close()]); + }); +}); + +// ---------------- Detach + reattach (the headline feature) ---------------- + +describe("detach + reattach", () => { + test("late subscriber gets prior output via replay", async () => { + const owner = await connectAndHello(sockPath); + const id = uniqueId("late"); + + owner.send({ + type: "open", + id, + meta: { + ...baseMeta, + argv: ["-c", "echo early-marker; sleep 1"], + }, + }); + await owner.waitFor((m) => m.type === "open-ok" && m.id === id); + + // Wait for output to be buffered without any subscriber. + await new Promise((r) => setTimeout(r, 200)); + + const late = await connectAndHello(sockPath); + late.send({ type: "subscribe", id, replay: true }); + await late.waitFor( + (m) => + m.type === "output" && + m.id === id && + Buffer.from(m.data, "base64").toString().includes("early-marker"), + 3000, + ); + + owner.send({ type: "close", id, signal: "SIGTERM" }); + await Promise.all([owner.close(), late.close()]); + }); + + test("reattach cycle: subscribe → disconnect → new conn subscribes-with-replay → continues live", async () => { + const owner = await connectAndHello(sockPath); + const id = uniqueId("reattach"); + + owner.send({ + type: "open", + id, + meta: { ...baseMeta, argv: ["-i"] }, + }); + await owner.waitFor((m) => m.type === "open-ok" && m.id === id); + + const first = await connectAndHello(sockPath); + first.send({ type: "subscribe", id, replay: false }); + + // Generate some output via input. + owner.send({ + type: "input", + id, + data: Buffer.from("echo before-reattach\n").toString("base64"), + }); + await first.waitFor( + (m) => + m.type === "output" && + Buffer.from(m.data, "base64").toString().includes("before-reattach"), + 3000, + ); + + // Disconnect the first client. PTY keeps running. + await first.close(); + + // New client connects, asks for replay, and sends another input. + const second = await connectAndHello(sockPath); + second.send({ type: "subscribe", id, replay: true }); + // Replay should arrive immediately containing the prior output. + await second.waitFor( + (m) => + m.type === "output" && + m.id === id && + Buffer.from(m.data, "base64").toString().includes("before-reattach"), + 2000, + ); + + owner.send({ + type: "input", + id, + data: Buffer.from("echo after-reattach\n").toString("base64"), + }); + await second.waitFor( + (m) => + m.type === "output" && + m.id === id && + Buffer.from(m.data, "base64").toString().includes("after-reattach"), + 3000, + ); + + owner.send({ type: "close", id, signal: "SIGTERM" }); + await Promise.all([owner.close(), second.close()]); + }); +}); + +// ---------------- list ---------------- + +describe("list", () => { + test("reflects active sessions", async () => { + const c = await connectAndHello(sockPath); + const id = uniqueId("listed"); + c.send({ type: "open", id, meta: baseMeta }); + await c.waitFor((m) => m.type === "open-ok" && m.id === id); + + c.send({ type: "list" }); + const reply = await c.waitFor((m) => m.type === "list-reply"); + assert.equal(reply.type, "list-reply"); + if (reply.type === "list-reply") { + const found = reply.sessions.find((s) => s.id === id); + assert.ok(found, "session should appear in list"); + assert.equal(found?.cols, 80); + assert.equal(found?.rows, 24); + assert.equal(found?.alive, true); + } + + c.send({ type: "close", id, signal: "SIGTERM" }); + await c.close(); + }); +}); + +// ---------------- Malformed / abusive input ---------------- + +describe("hostile input", () => { + test("non-JSON in a frame disconnects the client; daemon survives", async () => { + const owner = await connectAndHello(sockPath); + const id = uniqueId("survive"); + owner.send({ + type: "open", + id, + meta: { ...baseMeta, argv: ["-i"] }, + }); + await owner.waitFor((m) => m.type === "open-ok" && m.id === id); + + // Hostile client sends a length-prefixed buffer of garbage that isn't JSON. + const bad = await connect(sockPath); + const garbage = Buffer.from("\x00\x00\x00\x05NOT{}"); + bad.sendRaw(garbage); + // Server should disconnect this conn cleanly. + await new Promise((res) => bad.onClose(res)); + + // Owner is still functional. + owner.send({ type: "subscribe", id, replay: false }); + owner.send({ + type: "input", + id, + data: Buffer.from("echo still-alive\n").toString("base64"), + }); + await owner.waitFor( + (m) => + m.type === "output" && + Buffer.from(m.data, "base64").toString().includes("still-alive"), + 3000, + ); + + owner.send({ type: "close", id, signal: "SIGTERM" }); + await owner.close(); + }); + + test("oversized frame header (> 8 MB cap) disconnects; daemon survives", async () => { + const bad = await connect(sockPath); + const hugeHeader = Buffer.alloc(4); + hugeHeader.writeUInt32BE(20 * 1024 * 1024, 0); + bad.sendRaw(hugeHeader); + await new Promise((res) => bad.onClose(res)); + + // Daemon is still accepting connections. + const c = await connectAndHello(sockPath); + c.send({ type: "list" }); + await c.waitFor((m) => m.type === "list-reply", 1000); + await c.close(); + }); + + test("input on already-exited session returns EEXITED", async () => { + const c = await connectAndHello(sockPath); + const id = uniqueId("dead"); + c.send({ + type: "open", + id, + meta: { ...baseMeta, argv: ["-c", "true"] }, + }); + await c.waitFor((m) => m.type === "open-ok" && m.id === id); + c.send({ type: "subscribe", id, replay: true }); + await c.waitFor((m) => m.type === "exit" && m.id === id, 3000); + + c.send({ + type: "input", + id, + data: Buffer.from("ignored").toString("base64"), + }); + const err = await c.waitFor((m) => m.type === "error", 1000); + if (err.type === "error") assert.equal(err.code, "EEXITED"); + await c.close(); + }); +}); + +// ---------------- Concurrency stress ---------------- + +describe("concurrency", () => { + test("20 sessions opened and streaming concurrently", async () => { + const c = await connectAndHello(sockPath); + const N = 20; + const ids = Array.from({ length: N }, (_, i) => uniqueId(`conc-${i}`)); + + // Open all sessions. Use a workload that runs long enough to outlast + // the open+subscribe round-trip on a busy machine — the spawns happen + // in parallel, but `subscribe replay:false` would race exits otherwise. + for (const id of ids) { + c.send({ + type: "open", + id, + meta: { + ...baseMeta, + argv: ["-c", "echo TICK:start; sleep 0.5; echo TICK:end"], + }, + }); + } + + // Wait for all open-oks. + const openIds = new Set(); + while (openIds.size < N) { + const m = await c.waitFor( + (m) => m.type === "open-ok" && !openIds.has(m.id), + 10_000, + ); + if (m.type === "open-ok") openIds.add(m.id); + } + assert.equal(openIds.size, N); + + // Subscribe with replay so even sessions whose first output landed before + // our subscribe arrives are still surfaced. + for (const id of ids) c.send({ type: "subscribe", id, replay: true }); + + // Wait for the start marker from each session. + const seen = new Set(); + while (seen.size < N) { + const m = await c.waitFor( + (m) => + m.type === "output" && + !seen.has(m.id) && + ids.includes(m.id) && + Buffer.from(m.data, "base64").toString().includes("TICK:start"), + 10_000, + ); + if (m.type === "output") seen.add(m.id); + } + assert.equal(seen.size, N); + + // Wait for all to exit. + const exited = new Set(); + while (exited.size < N) { + const m = await c.waitFor( + (m) => m.type === "exit" && !exited.has(m.id) && ids.includes(m.id), + 10_000, + ); + if (m.type === "exit") exited.add(m.id); + } + + await c.close(); + }); + + test("multiple connections opening sessions in parallel", async () => { + const N = 10; + const conns = await Promise.all( + Array.from({ length: N }, () => connectAndHello(sockPath)), + ); + + await Promise.all( + conns.map(async (c, i) => { + const id = uniqueId(`parallel-${i}`); + c.send({ + type: "open", + id, + meta: { ...baseMeta, argv: ["-c", `echo CONN:${i}; sleep 0.2`] }, + }); + await c.waitFor((m) => m.type === "open-ok" && m.id === id, 5000); + c.send({ type: "subscribe", id, replay: true }); + await c.waitFor( + (m) => + m.type === "output" && + m.id === id && + Buffer.from(m.data, "base64").toString().includes(`CONN:${i}`), + 5000, + ); + c.send({ type: "close", id, signal: "SIGTERM" }); + await c.close(); + }), + ); + }); +}); + +// ---------------- Server shutdown ---------------- + +describe("server shutdown", () => { + test("disconnects active clients cleanly via close()", async () => { + // Use a *separate* short-lived server so we don't tear down the suite's main one. + const localPath = path.join( + os.tmpdir(), + `pty-daemon-shutdown-${process.pid}-${Date.now()}.sock`, + ); + const local = new Server({ + socketPath: localPath, + daemonVersion: "0.0.0-local", + }); + await local.listen(); + + const c = await connectAndHello(localPath); + const id = uniqueId("shutdown"); + c.send({ + type: "open", + id, + meta: { ...baseMeta, argv: ["-c", "sleep 60"] }, + }); + await c.waitFor((m) => m.type === "open-ok" && m.id === id); + + const closeWaiter = new Promise((res) => c.onClose(res)); + await local.close(); + // Server.close() destroys all connections. + await closeWaiter; + assert.equal(c.closed(), true); + }); +}); + +// ---------------- Frame-level encoding sanity ---------------- + +describe("framing on the wire", () => { + test("server tolerates split frames across multiple TCP chunks", async () => { + const c = await connect(sockPath); + const hello = encodeFrame({ type: "hello", protocols: [1] }); + // Send the hello in 3-byte chunks to force the decoder to buffer. + for (let i = 0; i < hello.length; i += 3) { + c.sendRaw(hello.subarray(i, Math.min(i + 3, hello.length))); + await new Promise((r) => setTimeout(r, 1)); + } + await c.waitFor((m) => m.type === "hello-ack", 1000); + await c.close(); + }); +}); diff --git a/packages/pty-daemon/test/helpers/client.ts b/packages/pty-daemon/test/helpers/client.ts new file mode 100644 index 00000000000..3a1b548bebe --- /dev/null +++ b/packages/pty-daemon/test/helpers/client.ts @@ -0,0 +1,151 @@ +// Reusable test client for pty-daemon integration tests. +// Speaks the daemon's wire protocol over a Unix socket. + +import * as net from "node:net"; +import { + encodeFrame, + FrameDecoder, + type ServerMessage, +} from "../../src/protocol/index.ts"; + +export interface DaemonClient { + socket: net.Socket; + messages: ServerMessage[]; + send(m: unknown): void; + waitFor( + predicate: (m: ServerMessage) => boolean, + ms?: number, + ): Promise; + collect( + predicate: (m: ServerMessage) => boolean, + ms: number, + ): Promise; + sendRaw(buf: Buffer): void; + close(): Promise; + closed(): boolean; + onClose(cb: () => void): void; +} + +interface Waiter { + predicate: (m: ServerMessage) => boolean; + resolve: (m: ServerMessage) => void; + reject: (e: Error) => void; + timer: NodeJS.Timeout; +} + +export function connect(socketPath: string): Promise { + return new Promise((resolve, reject) => { + const socket = net.createConnection({ path: socketPath }); + const decoder = new FrameDecoder(); + const messages: ServerMessage[] = []; + const waiters: Waiter[] = []; + const closeCbs: Array<() => void> = []; + let isClosed = false; + + socket.on("data", (chunk) => { + try { + decoder.push(chunk); + for (const raw of decoder.drain()) { + const m = raw as ServerMessage; + messages.push(m); + for (let i = waiters.length - 1; i >= 0; i--) { + const w = waiters[i]; + if (w?.predicate(m)) { + clearTimeout(w.timer); + waiters.splice(i, 1); + w.resolve(m); + } + } + } + } catch (err) { + // Surface frame errors to any pending waiter. + for (const w of waiters) { + clearTimeout(w.timer); + w.reject(err as Error); + } + waiters.length = 0; + } + }); + + socket.on("close", () => { + isClosed = true; + for (const cb of closeCbs) cb(); + }); + socket.once("error", reject); + socket.once("connect", () => { + socket.off("error", reject); + resolve({ + socket, + messages, + send(m) { + if (!socket.destroyed) socket.write(encodeFrame(m)); + }, + sendRaw(buf) { + if (!socket.destroyed) socket.write(buf); + }, + waitFor(predicate, ms = 5000) { + return new Promise((res, rej) => { + const found = messages.find(predicate); + if (found) return res(found); + const timer = setTimeout(() => { + const i = waiters.findIndex((w) => w.predicate === predicate); + if (i >= 0) waiters.splice(i, 1); + rej(new Error(`waitFor timed out after ${ms}ms`)); + }, ms); + waiters.push({ predicate, resolve: res, reject: rej, timer }); + }); + }, + collect(predicate, ms) { + return new Promise((res) => { + const collected: ServerMessage[] = messages.filter(predicate); + const onMsg = (chunk: Buffer) => { + void chunk; + for (let i = collected.length; i < messages.length; i++) { + const m = messages[i]; + if (m && predicate(m)) collected.push(m); + } + }; + socket.on("data", onMsg); + setTimeout(() => { + socket.off("data", onMsg); + // Final sweep in case of late drains. + for (let i = collected.length; i < messages.length; i++) { + const m = messages[i]; + if (m && predicate(m)) collected.push(m); + } + res(collected); + }, ms); + }); + }, + close() { + return new Promise((res) => { + if (socket.destroyed) return res(); + socket.end(() => res()); + // Fall back: if `end` doesn't fire close within 200ms, force. + setTimeout(() => { + if (!socket.destroyed) socket.destroy(); + res(); + }, 200); + }); + }, + closed() { + return isClosed; + }, + onClose(cb) { + if (isClosed) cb(); + else closeCbs.push(cb); + }, + }); + }); + }); +} + +/** Convenience: connect and complete the v1 handshake. */ +export async function connectAndHello( + socketPath: string, +): Promise { + const c = await connect(socketPath); + c.send({ type: "hello", protocols: [1] }); + await c.waitFor((m) => m.type === "hello-ack"); + return c; +} diff --git a/packages/pty-daemon/test/integration.test.ts b/packages/pty-daemon/test/integration.test.ts new file mode 100644 index 00000000000..4d4ee962f16 --- /dev/null +++ b/packages/pty-daemon/test/integration.test.ts @@ -0,0 +1,91 @@ +// Smoke / happy-path integration test for pty-daemon. +// +// Runs under Node (`node --experimental-strip-types --test`); see +// test/control-plane.test.ts for the exhaustive control-plane scenarios. + +import { strict as assert } from "node:assert"; +import * as os from "node:os"; +import * as path from "node:path"; +import { after, before, test } from "node:test"; +import { Server } from "../src/Server/index.ts"; +import { connect, connectAndHello } from "./helpers/client.ts"; + +const sockPath = path.join(os.tmpdir(), `pty-daemon-smoke-${process.pid}.sock`); +let server: Server; + +before(async () => { + server = new Server({ socketPath: sockPath, daemonVersion: "0.0.0-test" }); + await server.listen(); +}); + +after(async () => { + await server.close(); +}); + +test("handshake: hello → hello-ack", async () => { + const c = await connect(sockPath); + c.send({ type: "hello", protocols: [1] }); + const ack = await c.waitFor((m) => m.type === "hello-ack"); + assert.equal(ack.type, "hello-ack"); + if (ack.type === "hello-ack") { + assert.equal(ack.protocol, 1); + assert.equal(ack.daemonVersion, "0.0.0-test"); + } + await c.close(); +}); + +test("open → subscribe → output → exit lifecycle", async () => { + const c = await connectAndHello(sockPath); + c.send({ + type: "open", + id: "smoke-0", + meta: { + shell: "/bin/sh", + argv: ["-c", "echo daemon-smoke; sleep 0.2"], + cols: 80, + rows: 24, + }, + }); + await c.waitFor((m) => m.type === "open-ok" && m.id === "smoke-0"); + c.send({ type: "subscribe", id: "smoke-0", replay: true }); + + await c.waitFor( + (m) => + m.type === "output" && + m.id === "smoke-0" && + Buffer.from(m.data, "base64").toString().includes("daemon-smoke"), + 3000, + ); + const exit = await c.waitFor( + (m) => m.type === "exit" && m.id === "smoke-0", + 3000, + ); + if (exit.type === "exit") assert.equal(exit.code, 0); + await c.close(); +}); + +test("input is forwarded and echoed via output", async () => { + const c = await connectAndHello(sockPath); + c.send({ + type: "open", + id: "smoke-1", + meta: { shell: "/bin/sh", argv: ["-i"], cols: 80, rows: 24 }, + }); + await c.waitFor((m) => m.type === "open-ok"); + c.send({ type: "subscribe", id: "smoke-1", replay: false }); + c.send({ + type: "input", + id: "smoke-1", + data: Buffer.from("echo abc-marker\n").toString("base64"), + }); + await c.waitFor( + (m) => + m.type === "output" && + m.id === "smoke-1" && + Buffer.from(m.data, "base64").toString().includes("abc-marker"), + 3000, + ); + c.send({ type: "close", id: "smoke-1", signal: "SIGTERM" }); + await c.waitFor((m) => m.type === "closed" && m.id === "smoke-1"); + await c.close(); +}); diff --git a/packages/pty-daemon/tsconfig.json b/packages/pty-daemon/tsconfig.json new file mode 100644 index 00000000000..555dc9e9cf9 --- /dev/null +++ b/packages/pty-daemon/tsconfig.json @@ -0,0 +1,12 @@ +{ + "extends": "@superset/typescript/internal-package.json", + "compilerOptions": { + "types": ["bun-types", "node"], + "noUncheckedIndexedAccess": true, + "allowImportingTsExtensions": true, + "emitDeclarationOnly": false, + "noEmit": true + }, + "include": ["src", "test"], + "exclude": ["node_modules", "dist"] +}