diff --git a/apps/desktop/src/renderer/lib/terminal/terminal-ws-transport.ts b/apps/desktop/src/renderer/lib/terminal/terminal-ws-transport.ts index 3daa24211ee..ad36e7851c9 100644 --- a/apps/desktop/src/renderer/lib/terminal/terminal-ws-transport.ts +++ b/apps/desktop/src/renderer/lib/terminal/terminal-ws-transport.ts @@ -307,7 +307,14 @@ function attachSocketListeners( // channel; renderer treats them identically). Pipe straight into // xterm without any decoding step. if (event.data instanceof ArrayBuffer) { - terminal.write(new Uint8Array(event.data)); + // xterm.write's callback only fires once the parser has consumed + // these bytes into its buffer. That's the honest "consumed" signal + // for byte-level back-pressure — ACKing on WS receipt would lie + // about xterm being the bottleneck. + const bytes = event.data.byteLength; + terminal.write(new Uint8Array(event.data), () => { + sendOutputAck(transport, socket, bytes); + }); transport._hasReceivedBytes = true; return; } @@ -384,6 +391,17 @@ function attachSocketListeners( }); } +function sendOutputAck( + transport: TerminalTransport, + socket: WebSocket, + bytes: number, +) { + if (transport.socket !== socket) return; + if (socket.readyState !== WebSocket.OPEN) return; + if (transport.connectionState !== "open") return; + socket.send(JSON.stringify({ type: "output-ack", bytes })); +} + export function disconnect(transport: TerminalTransport) { cancelReconnect(transport); if (transport.socket) { diff --git a/packages/host-service/src/terminal/DaemonClient/DaemonClient.ts b/packages/host-service/src/terminal/DaemonClient/DaemonClient.ts index 6d529ddd5e1..bd6ea2af48c 100644 --- a/packages/host-service/src/terminal/DaemonClient/DaemonClient.ts +++ b/packages/host-service/src/terminal/DaemonClient/DaemonClient.ts @@ -186,15 +186,29 @@ export class DaemonClient { this.send({ type: "resize", id, cols, rows }); } + /** + * Fire-and-forget back-pressure ack. Tells the daemon that `bytes` of + * output for this session have been consumed by the renderer. Only + * meaningful when the subscription was opened with `flowControl: true`; + * otherwise the daemon has no counter to credit and ignores the message. + */ + ackOutput(id: string, bytes: number): void { + if (!Number.isFinite(bytes) || bytes <= 0) return; + this.send({ type: "ack-output", id, bytes: Math.floor(bytes) }); + } + /** * Subscribe to a session's output + exit stream. Returns an unsubscribe * function. With `replay: true` the daemon sends its current ring buffer * before live streaming begins. Multiple subscribers per session are * supported — the daemon fans output out to all of them. + * + * `flowControl: true` opts this subscription into byte-level ACK back- + * pressure (caller must invoke ackOutput as bytes are consumed). */ subscribe( id: string, - opts: { replay: boolean }, + opts: { replay: boolean; flowControl?: boolean }, cb: SubscribeCallbacks, ): () => void { let entry = this.callbacks.get(id); @@ -218,7 +232,12 @@ export class DaemonClient { ); } if (wasFirst) { - this.send({ type: "subscribe", id, replay: opts.replay }); + this.send({ + type: "subscribe", + id, + replay: opts.replay, + flowControl: opts.flowControl === true, + }); } return () => { const e = this.callbacks.get(id); diff --git a/packages/host-service/src/terminal/terminal.ts b/packages/host-service/src/terminal/terminal.ts index 3e9559843f9..5ace3f1c0e6 100644 --- a/packages/host-service/src/terminal/terminal.ts +++ b/packages/host-service/src/terminal/terminal.ts @@ -66,6 +66,11 @@ interface DaemonPty { */ writeBytes(bytes: Uint8Array): void; resize(cols: number, rows: number): void; + /** + * Forward the renderer's "I consumed N bytes" ack to the daemon's flow- + * control counter for the primary subscription on this session. + */ + ackOutput(bytes: number): void; kill(signal?: NodeJS.Signals): Promise; onData(cb: (data: string) => void): PtyDataDisposer; onExit( @@ -96,6 +101,13 @@ function makeDaemonPty( // Daemon may have disconnected; surface via the next op. } }, + ackOutput(bytes) { + try { + daemon.ackOutput(sessionId, bytes); + } catch { + // Daemon may have disconnected; reconnect path handles recovery. + } + }, kill(signal) { return daemon.close(sessionId, toDaemonSignal(signal)); }, @@ -160,6 +172,7 @@ function getHostAgentHookUrl(): string { type TerminalClientMessage = | { type: "input"; data: string } | { type: "resize"; cols: number; rows: number } + | { type: "output-ack"; bytes: number } | { type: "dispose" }; // PTY output bytes travel as binary WebSocket frames — the renderer pipes @@ -1274,7 +1287,7 @@ export async function createTerminalSessionInternal({ session.unsubscribeDaemon = daemon.subscribe( terminalId, - { replay: replayOnAdoption }, + { replay: replayOnAdoption, flowControl: true }, { onOutput(chunk) { // Bytes flow daemon → host → xterm without UTF-8 decoding; @@ -1591,6 +1604,11 @@ export function registerWorkspaceTerminalRoute({ const session = sessions.get(terminalId ?? ""); if (!session || !session.sockets.has(ws)) return; + if (message.type === "output-ack") { + session.pty.ackOutput(message.bytes); + return; + } + if (message.type === "dispose") { disposeSession(terminalId ?? "", db); return; diff --git a/packages/pty-daemon/package.json b/packages/pty-daemon/package.json index 75bd0c78c84..2d7a0e680d9 100644 --- a/packages/pty-daemon/package.json +++ b/packages/pty-daemon/package.json @@ -30,7 +30,7 @@ "build:daemon": "bun run build.ts", "typecheck": "tsc --noEmit --emitDeclarationOnly false", "test": "bun test src/protocol src/SessionStore src/handlers src/Pty/Pty.test.ts test/no-encoding-hops.test.ts", - "test:integration": "node --experimental-strip-types --test test/integration.test.ts test/control-plane.test.ts test/signal-recovery.test.ts test/byte-fidelity.test.ts test/handoff.test.ts" + "test:integration": "node --experimental-strip-types --test test/integration.test.ts test/control-plane.test.ts test/signal-recovery.test.ts test/byte-fidelity.test.ts test/handoff.test.ts test/flow-control.test.ts" }, "dependencies": { "node-pty": "1.1.0" diff --git a/packages/pty-daemon/src/Pty/Pty.ts b/packages/pty-daemon/src/Pty/Pty.ts index 054b09c6172..19f0f206c6d 100644 --- a/packages/pty-daemon/src/Pty/Pty.ts +++ b/packages/pty-daemon/src/Pty/Pty.ts @@ -22,6 +22,13 @@ export interface Pty { readonly pid: number; readonly meta: SessionMeta; write(data: Buffer): void; + /** + * Stop reading from the PTY master fd. The shell (slave side) eventually + * blocks on `write` once the kernel pipe buffer fills — real upstream + * back-pressure, not a software flag. + */ + pause(): void; + resume(): void; resize(cols: number, rows: number): void; kill(signal?: NodeJS.Signals): void; onData(cb: PtyOnData): void; @@ -84,6 +91,14 @@ class NodePtyAdapter implements Pty { this.term.write(data as unknown as string); } + pause(): void { + this.term.pause(); + } + + resume(): void { + this.term.resume(); + } + resize(cols: number, rows: number): void { validateDims(cols, rows); this.term.resize(cols, rows); @@ -253,6 +268,14 @@ class AdoptedPty implements Pty { return this.fd; } + pause(): void { + this.reader.pause(); + } + + resume(): void { + this.reader.resume(); + } + write(data: Buffer): void { if (this.exitFired) { throw new Error(`session exited: ${this.pid}`); diff --git a/packages/pty-daemon/src/Server/Server.ts b/packages/pty-daemon/src/Server/Server.ts index a7ffa918b2e..cc2070cc7a4 100644 --- a/packages/pty-daemon/src/Server/Server.ts +++ b/packages/pty-daemon/src/Server/Server.ts @@ -15,6 +15,7 @@ import { } from "../handlers/index.ts"; import { adoptFromFd } from "../Pty/index.ts"; import { + type AckOutputMessage, type ClientMessage, encodeFrame, FrameDecoder, @@ -45,6 +46,12 @@ export interface ServerOptions { const DEFAULT_OUTBOUND_BUFFER_CAP_BYTES = 8 * 1024 * 1024; +// Watermarks for output back-pressure. Mirrors VS Code's terminal flow-control +// defaults (their tracker counts characters; we count bytes because the wire +// is byte-native). 100KB high → pause the PTY; 5KB low → resume. +const FLOW_CONTROL_HIGH_WATERMARK = 100_000; +const FLOW_CONTROL_LOW_WATERMARK = 5_000; + interface ConnState extends Conn { socket: net.Socket; decoder: FrameDecoder; @@ -56,6 +63,8 @@ export class Server { private readonly store: SessionStore; private readonly conns = new Set(); private readonly opts: ServerOptions; + /** Sessions whose PTY is currently paused by flow control. */ + private readonly pausedSessions = new Set(); constructor(opts: ServerOptions) { this.opts = opts; @@ -317,6 +326,7 @@ export class Server { decoder: new FrameDecoder(), negotiated: null, subscriptions: new Set(), + flowControlUnacked: new Map(), send: (msg, payload) => writeMessage(socket, msg, payload, outboundBufferCap), }; @@ -338,10 +348,10 @@ export class Server { } }); socket.on("close", () => { - this.conns.delete(conn); + this.dropConn(conn); }); socket.on("error", () => { - this.conns.delete(conn); + this.dropConn(conn); }); } @@ -415,6 +425,11 @@ export class Server { } case "unsubscribe": { handleUnsubscribe(conn, msg); + this.maybeResume(msg.id); + return; + } + case "ack-output": { + this.handleAckOutput(conn, msg); return; } case "prepare-upgrade": { @@ -464,8 +479,14 @@ export class Server { this.store.appendOutput(session, chunk); const out: ServerMessage = { type: "output", id: session.id }; for (const c of this.conns) { - if (c.subscriptions.has(session.id)) c.send(out, chunk); + if (!c.subscriptions.has(session.id)) continue; + c.send(out, chunk); + const prev = c.flowControlUnacked.get(session.id); + if (prev !== undefined) { + c.flowControlUnacked.set(session.id, prev + chunk.byteLength); + } } + this.maybePause(session); }); session.pty.onExit((info) => { session.exited = true; @@ -481,8 +502,10 @@ export class Server { if (c.subscriptions.has(session.id)) { c.send(ev); c.subscriptions.delete(session.id); + c.flowControlUnacked.delete(session.id); } } + this.pausedSessions.delete(session.id); // Delete the session immediately. Without this, every closed // terminal pane left a row in the store forever — list-reply // inflated, memory grew unbounded. @@ -496,6 +519,70 @@ export class Server { this.store.delete(session.id); }); } + + private handleAckOutput(conn: ConnState, msg: AckOutputMessage): void { + if ( + !Number.isFinite(msg.bytes) || + !Number.isInteger(msg.bytes) || + msg.bytes <= 0 + ) { + conn.send({ + type: "error", + id: msg.id, + message: `invalid ack-output bytes: ${String(msg.bytes)}`, + code: "EPROTO", + }); + return; + } + const outstanding = conn.flowControlUnacked.get(msg.id); + if (outstanding === undefined) return; + const next = Math.max(0, outstanding - msg.bytes); + conn.flowControlUnacked.set(msg.id, next); + this.maybeResume(msg.id); + } + + private maybePause(session: Session): void { + if (this.pausedSessions.has(session.id)) return; + for (const c of this.conns) { + const u = c.flowControlUnacked.get(session.id); + if (u !== undefined && u > FLOW_CONTROL_HIGH_WATERMARK) { + try { + session.pty.pause(); + } catch { + // already exited; nothing to back-pressure + } + this.pausedSessions.add(session.id); + return; + } + } + } + + private maybeResume(sessionId: string): void { + if (!this.pausedSessions.has(sessionId)) return; + for (const c of this.conns) { + const u = c.flowControlUnacked.get(sessionId); + if (u !== undefined && u > FLOW_CONTROL_LOW_WATERMARK) return; + } + const session = this.store.get(sessionId); + this.pausedSessions.delete(sessionId); + if (session && !session.exited) { + try { + session.pty.resume(); + } catch { + // already exited; nothing to resume + } + } + } + + private dropConn(conn: ConnState): void { + if (!this.conns.delete(conn)) return; + // A dropped conn's unacked bytes effectively become "consumed" as far + // as the back-pressure decision is concerned — clearing them lets any + // PTY that was paused on this conn's behalf resume. + const sessionIds = Array.from(conn.flowControlUnacked.keys()); + conn.flowControlUnacked.clear(); + for (const id of sessionIds) this.maybeResume(id); + } } /** diff --git a/packages/pty-daemon/src/SessionStore/SessionStore.test.ts b/packages/pty-daemon/src/SessionStore/SessionStore.test.ts index f2ab3992997..a7b4bb7add3 100644 --- a/packages/pty-daemon/src/SessionStore/SessionStore.test.ts +++ b/packages/pty-daemon/src/SessionStore/SessionStore.test.ts @@ -12,6 +12,8 @@ function fakePty(meta: { cols: number; rows: number }): Pty { rows: meta.rows, }, write: () => {}, + pause: () => {}, + resume: () => {}, resize: () => {}, kill: () => {}, onData: () => {}, diff --git a/packages/pty-daemon/src/SessionStore/snapshot.test.ts b/packages/pty-daemon/src/SessionStore/snapshot.test.ts index 79ee26ce301..3cb7fe1262b 100644 --- a/packages/pty-daemon/src/SessionStore/snapshot.test.ts +++ b/packages/pty-daemon/src/SessionStore/snapshot.test.ts @@ -17,6 +17,8 @@ function fakePty(pid: number, meta: { cols: number; rows: number }): Pty { pid, meta: { shell: "/bin/sh", argv: [], cols: meta.cols, rows: meta.rows }, write: () => {}, + pause: () => {}, + resume: () => {}, resize: () => {}, kill: () => {}, onData: () => {}, diff --git a/packages/pty-daemon/src/handlers/handlers.test.ts b/packages/pty-daemon/src/handlers/handlers.test.ts index 8facfa1f3f9..f39cea3aa44 100644 --- a/packages/pty-daemon/src/handlers/handlers.test.ts +++ b/packages/pty-daemon/src/handlers/handlers.test.ts @@ -28,6 +28,8 @@ function makeFakePty(state: FakePtyState, meta: SpawnOptions["meta"]): Pty { pid: state.pid, meta, write: (b) => state.written.push(b), + pause: () => {}, + resume: () => {}, resize: (c, r) => { state.cols = c; state.rows = r; @@ -51,6 +53,7 @@ function makeConn(): Conn & { sent: SentFrame[] } { return { sent, subscriptions: new Set(), + flowControlUnacked: new Map(), send: (m, payload) => sent.push({ message: m, payload: payload ?? null }), }; } diff --git a/packages/pty-daemon/src/handlers/handlers.ts b/packages/pty-daemon/src/handlers/handlers.ts index 461f6b6d0e3..c519a05e80f 100644 --- a/packages/pty-daemon/src/handlers/handlers.ts +++ b/packages/pty-daemon/src/handlers/handlers.ts @@ -27,6 +27,13 @@ import type { Session, SessionStore } from "../SessionStore/index.ts"; */ export interface Conn { subscriptions: Set; + /** + * Per-session unacked-bytes counter for back-pressure. Populated only for + * sessions whose subscription set `flowControl: true`; absent entries + * indicate the subscription is not flow-controlled (output is fire-and- + * forget, daemon trusts socket-level backpressure). + */ + flowControlUnacked: Map; send(message: ServerMessage, payload?: Uint8Array): void; } @@ -150,17 +157,26 @@ export function handleSubscribe( return; } conn.subscriptions.add(msg.id); + if (msg.flowControl) conn.flowControlUnacked.set(msg.id, 0); if (msg.replay) { const snap = ctx.store.snapshotBuffer(session); if (snap.byteLength > 0) { const out: OutputMessage = { type: "output", id: msg.id }; conn.send(out, snap); + // Replay rides the same `output` frame the renderer ACKs on parse, + // so charge it to the flow-control counter — otherwise the daemon + // under-counts what's in flight on subscribe and lets the renderer + // chew through `replay + 100KB live` before back-pressure kicks in. + if (msg.flowControl) { + conn.flowControlUnacked.set(msg.id, snap.byteLength); + } } } } export function handleUnsubscribe(conn: Conn, msg: UnsubscribeMessage): void { conn.subscriptions.delete(msg.id); + conn.flowControlUnacked.delete(msg.id); } function errorFor( diff --git a/packages/pty-daemon/src/protocol/index.ts b/packages/pty-daemon/src/protocol/index.ts index f7915e36fca..8b136144c3f 100644 --- a/packages/pty-daemon/src/protocol/index.ts +++ b/packages/pty-daemon/src/protocol/index.ts @@ -5,6 +5,7 @@ export type { UpgradeNakMessage, } from "./handoff.ts"; export type { + AckOutputMessage, ClientMessage, ClosedMessage, CloseMessage, diff --git a/packages/pty-daemon/src/protocol/messages.ts b/packages/pty-daemon/src/protocol/messages.ts index 269b3bac1dc..fee4b51492e 100644 --- a/packages/pty-daemon/src/protocol/messages.ts +++ b/packages/pty-daemon/src/protocol/messages.ts @@ -79,6 +79,12 @@ export interface SubscribeMessage { id: string; /** if true, replay buffered output before live streaming */ replay: boolean; + /** + * Opt this subscription into byte-level ACK back-pressure. Daemon counts + * unacked output bytes; when the count crosses the high watermark it + * pauses the PTY, resuming once acks drop below the low watermark. + */ + flowControl?: boolean; } export interface UnsubscribeMessage { @@ -86,6 +92,17 @@ export interface UnsubscribeMessage { id: string; } +/** + * Subscriber tells the daemon it has consumed `bytes` of output for this + * session. Drives the flow-control counter; only meaningful when the + * subscription was opened with `flowControl: true`. + */ +export interface AckOutputMessage { + type: "ack-output"; + id: string; + bytes: number; +} + /** * Phase 2: client tells the daemon to spawn a successor process and hand * the PTY master fds over via stdio inheritance. Daemon replies with @@ -158,6 +175,7 @@ export type ClientMessage = | ListMessage | SubscribeMessage | UnsubscribeMessage + | AckOutputMessage | PrepareUpgradeMessage; export type ServerMessage = diff --git a/packages/pty-daemon/test/byte-fidelity.test.ts b/packages/pty-daemon/test/byte-fidelity.test.ts index f324e48ac7d..5141af5a6ef 100644 --- a/packages/pty-daemon/test/byte-fidelity.test.ts +++ b/packages/pty-daemon/test/byte-fidelity.test.ts @@ -43,6 +43,7 @@ const sockPath = path.join(os.tmpdir(), `pty-daemon-bytes-${process.pid}.sock`); */ interface DriveablePty extends Pty { writes: Buffer[]; + paused: boolean; emit(bytes: Uint8Array): void; finish(code: number): void; } @@ -58,9 +59,16 @@ function makeDriveablePty(meta: SpawnOptions["meta"]): DriveablePty { pid, meta, writes: [] as Buffer[], + paused: false as boolean, write: (data: Buffer) => { pty.writes.push(Buffer.from(data)); }, + pause: () => { + pty.paused = true; + }, + resume: () => { + pty.paused = false; + }, resize: () => {}, kill: () => {}, getMasterFd: () => -1, diff --git a/packages/pty-daemon/test/flow-control.test.ts b/packages/pty-daemon/test/flow-control.test.ts new file mode 100644 index 00000000000..5479a3d4ab3 --- /dev/null +++ b/packages/pty-daemon/test/flow-control.test.ts @@ -0,0 +1,217 @@ +// Daemon flow-control: subscribers that opt in via `flowControl: true` cause +// the PTY to pause once unacked output crosses the high watermark, and to +// resume once acks bring outstanding bytes back below the low watermark. + +import { strict as assert } from "node:assert"; +import * as os from "node:os"; +import * as path from "node:path"; +import { after, before, test } from "node:test"; +import type { + Pty, + PtyOnData, + PtyOnExit, + SpawnOptions, +} from "../src/Pty/index.ts"; +import { Server } from "../src/Server/index.ts"; +import { connectAndHello, type DaemonClient } from "./helpers/client.ts"; + +const sockPath = path.join(os.tmpdir(), `pty-daemon-flow-${process.pid}.sock`); + +interface DriveablePty extends Pty { + paused: boolean; + pauseCount: number; + resumeCount: number; + emit(bytes: Uint8Array): void; + finish(code: number): void; +} + +let nextPid = 9000; +let lastSpawned: DriveablePty | null = null; + +function makeDriveablePty(meta: SpawnOptions["meta"]): DriveablePty { + const onDataCbs: PtyOnData[] = []; + const onExitCbs: PtyOnExit[] = []; + const pty = { + pid: nextPid++, + meta, + paused: false as boolean, + pauseCount: 0, + resumeCount: 0, + write: () => {}, + pause: () => { + pty.paused = true; + pty.pauseCount += 1; + }, + resume: () => { + pty.paused = false; + pty.resumeCount += 1; + }, + resize: () => {}, + kill: () => {}, + getMasterFd: () => -1, + onData: (cb: PtyOnData) => { + onDataCbs.push(cb); + }, + onExit: (cb: PtyOnExit) => { + onExitCbs.push(cb); + }, + emit: (bytes: Uint8Array) => { + for (const cb of onDataCbs) cb(Buffer.from(bytes)); + }, + finish: (code: number) => { + for (const cb of onExitCbs) cb({ code, signal: null }); + }, + } satisfies DriveablePty; + return pty; +} + +let server: Server; + +before(async () => { + server = new Server({ + socketPath: sockPath, + daemonVersion: "0.0.0-flow", + bufferCap: 512 * 1024, + spawnPty: ({ meta }) => { + const pty = makeDriveablePty(meta); + lastSpawned = pty; + return pty; + }, + }); + await server.listen(); +}); + +after(async () => { + await server.close(); +}); + +const META = { + shell: "/bin/sh", + argv: [] as string[], + cols: 80, + rows: 24, +}; + +async function openPty(c: DaemonClient, id: string): Promise { + lastSpawned = null; + c.send({ type: "open", id, meta: META }); + await c.waitFor((m) => m.type === "open-ok" && m.id === id); + assert.ok(lastSpawned, "spawnPty hook must have fired"); + return lastSpawned; +} + +/** + * Subscribe replies nothing on success; round-trip a `list` to ensure the + * preceding `subscribe` has been processed before we drive the PTY. + */ +async function subscribe( + c: DaemonClient, + id: string, + flowControl: boolean, +): Promise { + const reply = c.waitForNext((m) => m.type === "list-reply", 1000); + c.send({ type: "subscribe", id, replay: false, flowControl }); + c.send({ type: "list" }); + await reply; +} + +async function ackAndSettle( + c: DaemonClient, + id: string, + bytes: number, +): Promise { + const reply = c.waitForNext((m) => m.type === "list-reply", 1000); + c.send({ type: "ack-output", id, bytes }); + c.send({ type: "list" }); + await reply; +} + +test("flowControl pauses PTY at high watermark, resumes once acked below low watermark", async () => { + const c = await connectAndHello(sockPath); + const id = "fc-basic"; + let pty: DriveablePty | null = null; + try { + pty = await openPty(c, id); + await subscribe(c, id, true); + + // Single chunk above the 100 KB high watermark → pause exactly once. + pty.emit(new Uint8Array(120_000)); + // Round-trip to make sure the pause callback has run. + const reply = c.waitForNext((m) => m.type === "list-reply", 1000); + c.send({ type: "list" }); + await reply; + assert.equal(pty.pauseCount, 1, "PTY should be paused exactly once"); + assert.equal(pty.resumeCount, 0, "PTY should not have resumed yet"); + assert.equal(pty.paused, true); + + // Ack 110 KB — outstanding is now ~10 KB, still above the 5 KB low + // watermark, so no resume yet. + await ackAndSettle(c, id, 110_000); + assert.equal(pty.resumeCount, 0, "should not resume above low watermark"); + assert.equal(pty.paused, true); + + // Ack the last ~10 KB — drops below 5 KB low watermark, resume fires. + await ackAndSettle(c, id, 10_000); + assert.equal(pty.resumeCount, 1, "PTY should resume exactly once"); + assert.equal(pty.paused, false); + } finally { + c.send({ type: "close", id }); + pty?.finish(0); + await c.close(); + } +}); + +test("subscriber without flowControl does not pause the PTY", async () => { + const c = await connectAndHello(sockPath); + const id = "fc-disabled"; + let pty: DriveablePty | null = null; + try { + pty = await openPty(c, id); + await subscribe(c, id, false); + + pty.emit(new Uint8Array(500_000)); + const reply = c.waitForNext((m) => m.type === "list-reply", 1000); + c.send({ type: "list" }); + await reply; + assert.equal(pty.pauseCount, 0, "no opt-in → no pause"); + } finally { + c.send({ type: "close", id }); + pty?.finish(0); + await c.close(); + } +}); + +test("connection drop releases unacked bytes and resumes paused PTY", async () => { + const slow = await connectAndHello(sockPath); + const opener = await connectAndHello(sockPath); + const id = "fc-disconnect"; + let pty: DriveablePty | null = null; + try { + pty = await openPty(opener, id); + await subscribe(slow, id, true); + + pty.emit(new Uint8Array(120_000)); + const reply = slow.waitForNext((m) => m.type === "list-reply", 1000); + slow.send({ type: "list" }); + await reply; + assert.equal(pty.pauseCount, 1); + + await slow.close(); + // Server learns about the close asynchronously; give it a moment, then + // round-trip via the remaining connection to ensure dropConn has run. + await new Promise((r) => setTimeout(r, 20)); + const reply2 = opener.waitForNext((m) => m.type === "list-reply", 1000); + opener.send({ type: "list" }); + await reply2; + assert.equal( + pty.resumeCount, + 1, + "dropping the slow conn should release its unacked bytes", + ); + assert.equal(pty.paused, false); + } finally { + opener.send({ type: "close", id }); + pty?.finish(0); + await opener.close(); + } +});