Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 6 additions & 19 deletions apps/desktop/src/renderer/lib/terminal/terminal-ws-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,12 @@ function attachSocketListeners(
// channel; renderer treats them identically). Pipe straight into
// xterm without any decoding step.
if (event.data instanceof ArrayBuffer) {
// xterm.write's callback only fires once the parser has consumed
// these bytes into its buffer. That's the honest "consumed" signal
// for byte-level back-pressure — ACKing on WS receipt would lie
// about xterm being the bottleneck.
const bytes = event.data.byteLength;
terminal.write(new Uint8Array(event.data), () => {
sendOutputAck(transport, socket, bytes);
});
// Pipe PTY bytes straight into xterm. There's no output ACK back to
// host-service: back-pressure lives entirely on the host side, which
// bounds this socket's send buffer and drops us (we reconnect and
// replay) if we fall hopelessly behind. That means a slow/stalled
// renderer can never wedge the shell — it just loses some scrollback.
terminal.write(new Uint8Array(event.data));
transport._hasReceivedBytes = true;
return;
}
Expand Down Expand Up @@ -391,17 +389,6 @@ function attachSocketListeners(
});
}

function sendOutputAck(
transport: TerminalTransport,
socket: WebSocket,
bytes: number,
) {
if (transport.socket !== socket) return;
if (socket.readyState !== WebSocket.OPEN) return;
if (transport.connectionState !== "open") return;
socket.send(JSON.stringify({ type: "output-ack", bytes }));
}

export function disconnect(transport: TerminalTransport) {
cancelReconnect(transport);
if (transport.socket) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import { describe, expect, it } from "bun:test";
import {
getAllowedSectionsForVariant,
getVisibleItemsForSection,
SETTING_ITEM_ID,
type SettingsItem,
searchSettings,
Expand Down Expand Up @@ -64,15 +62,4 @@ describe("settings search - font settings", () => {
expect(editorFont?.section).toBe("appearance");
expect(terminalFont?.section).toBe("appearance");
});

it("keeps the Git tab visible in v2 for the user worktree location", () => {
expect(getAllowedSectionsForVariant(true).has("git")).toBe(true);
expect(
getVisibleItemsForSection({
section: "git",
searchQuery: "",
isV2: true,
}),
).toEqual([SETTING_ITEM_ID.GIT_WORKTREE_LOCATION]);
});
});
17 changes: 1 addition & 16 deletions packages/host-service/src/terminal/DaemonClient/DaemonClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,29 +186,15 @@ export class DaemonClient {
this.send({ type: "resize", id, cols, rows });
}

/**
* Fire-and-forget back-pressure ack. Tells the daemon that `bytes` of
* output for this session have been consumed by the renderer. Only
* meaningful when the subscription was opened with `flowControl: true`;
* otherwise the daemon has no counter to credit and ignores the message.
*/
ackOutput(id: string, bytes: number): void {
if (!Number.isFinite(bytes) || bytes <= 0) return;
this.send({ type: "ack-output", id, bytes: Math.floor(bytes) });
}

/**
* Subscribe to a session's output + exit stream. Returns an unsubscribe
* function. With `replay: true` the daemon sends its current ring buffer
* before live streaming begins. Multiple subscribers per session are
* supported — the daemon fans output out to all of them.
*
* `flowControl: true` opts this subscription into byte-level ACK back-
* pressure (caller must invoke ackOutput as bytes are consumed).
*/
subscribe(
id: string,
opts: { replay: boolean; flowControl?: boolean },
opts: { replay: boolean },
cb: SubscribeCallbacks,
): () => void {
let entry = this.callbacks.get(id);
Expand Down Expand Up @@ -236,7 +222,6 @@ export class DaemonClient {
type: "subscribe",
id,
replay: opts.replay,
flowControl: opts.flowControl === true,
});
}
return () => {
Expand Down
70 changes: 70 additions & 0 deletions packages/host-service/src/terminal/terminal.adoption.node-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,70 @@ describe("createTerminalSessionInternal — host-service restart adoption", () =

await disposeSessionAndWait(terminalId, db);
});

// Regression: SUPER-939 / #4993 — heavy/concurrent output must never wedge
// the shell. Output flow control is gone; back-pressure is bounded buffering
// on the host side, never a producer pause. These guard both halves of that.

test("heavy output with no renderer attached never wedges the PTY", async () => {
const terminalId = `e2e-heavy-nobody-${randomUUID().slice(0, 8)}`;
const result = await createTerminalSessionInternal({
terminalId,
workspaceId,
db,
listed: true,
});
assert.ok(!("error" in result));
if ("error" in result) return;

// ~3 MB with no socket attached — far past any old watermark. With the
// ACK flow control removed, the daemon never pauses, so this completes;
// the bounded replay buffer just keeps the tail (incl. the marker).
const marker = `heavy-done-${randomUUID().slice(0, 6)}`;
result.pty.write(
`i=0; while [ "$i" -lt 48000 ]; do printf '0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef\\n'; i=$((i + 1)); done; echo ${marker}\n`,
);

await waitFor(() => sessionBufferText(result).includes(marker), 15_000);
await disposeSessionAndWait(terminalId, db);
});

test("a renderer whose send buffer exceeds the cap is dropped; output keeps flowing", async () => {
const terminalId = `e2e-slow-renderer-${randomUUID().slice(0, 8)}`;
const result = await createTerminalSessionInternal({
terminalId,
workspaceId,
db,
listed: true,
});
assert.ok(!("error" in result));
if ("error" in result) return;

// A renderer that's permanently behind: its WS send buffer never drains,
// so bufferedAmount sits way over the 8 MB cap. broadcastBytes must drop
// it instead of buffering forever.
let closed = false;
const stuckSocket = {
send: () => {},
close: () => {
closed = true;
},
readyState: 1, // SOCKET_OPEN
raw: { bufferedAmount: 64 * 1024 * 1024 },
};
result.sockets.add(stuckSocket);

const marker = `slow-done-${randomUUID().slice(0, 6)}`;
result.pty.write(
`i=0; while [ "$i" -lt 6000 ]; do printf '0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef\\n'; i=$((i + 1)); done; echo ${marker}\n`,
);

// The stuck socket is closed and removed on the next broadcast, and the
// PTY keeps producing — the marker lands in the (now socketless) buffer.
await waitFor(() => closed && !result.sockets.has(stuckSocket), 10_000);
await waitFor(() => sessionBufferText(result).includes(marker), 15_000);
await disposeSessionAndWait(terminalId, db);
});
});

// ---------------- helpers ----------------
Expand Down Expand Up @@ -667,3 +731,9 @@ async function waitForOutput(
disposer.dispose();
}
}

function sessionBufferText(session: { buffer: Uint8Array[] }): string {
return Buffer.concat(session.buffer.map((b) => Buffer.from(b))).toString(
"utf8",
);
}
49 changes: 30 additions & 19 deletions packages/host-service/src/terminal/terminal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,6 @@ interface DaemonPty {
*/
writeBytes(bytes: Uint8Array): void;
resize(cols: number, rows: number): void;
/**
* Forward the renderer's "I consumed N bytes" ack to the daemon's flow-
* control counter for the primary subscription on this session.
*/
ackOutput(bytes: number): void;
kill(signal?: NodeJS.Signals): Promise<void>;
onData(cb: (data: string) => void): PtyDataDisposer;
onExit(
Expand Down Expand Up @@ -101,13 +96,6 @@ function makeDaemonPty(
// Daemon may have disconnected; surface via the next op.
}
},
ackOutput(bytes) {
try {
daemon.ackOutput(sessionId, bytes);
} catch {
// Daemon may have disconnected; reconnect path handles recovery.
}
},
kill(signal) {
return daemon.close(sessionId, toDaemonSignal(signal));
},
Expand Down Expand Up @@ -172,7 +160,6 @@ function getHostAgentHookUrl(): string {
type TerminalClientMessage =
| { type: "input"; data: string }
| { type: "resize"; cols: number; rows: number }
| { type: "output-ack"; bytes: number }
| { type: "dispose" };

// PTY output bytes travel as binary WebSocket frames — the renderer pipes
Expand All @@ -187,6 +174,14 @@ type TerminalServerMessage =
| { type: "title"; title: string | null };

const MAX_BUFFER_BYTES = 64 * 1024;
// Cap on a single renderer socket's unflushed WebSocket send buffer. With no
// ACK flow control, a renderer that stops draining (slow paint, pinned main
// thread, dead tab) would let this buffer grow without bound → host OOM (the
// risk #4868 was about). Once a socket blows past this, we drop it; the
// renderer auto-reconnects and replays the bounded tail buffer. Crucially the
// PTY is never paused, so a stalled renderer can't wedge the shell. Matches the
// daemon's own 8 MB outbound socket cap.
const WS_SEND_BUFFER_CAP_BYTES = 8 * 1024 * 1024;
const SOCKET_OPEN = 1;
const SOCKET_CLOSING = 2;
const SOCKET_CLOSED = 3;
Expand All @@ -196,10 +191,13 @@ const MIN_TERMINAL_COLS = 20;
const MIN_TERMINAL_ROWS = 5;

// `<ArrayBuffer>` narrowing matches hono/ws's WSContext.send signature.
// `raw` is the underlying `ws` WebSocket (present for node-ws); we read
// `bufferedAmount` off it to bound a slow renderer's send queue.
type TerminalSocket = {
send: (data: string | Uint8Array<ArrayBuffer>) => void;
close: (code?: number, reason?: string) => void;
readyState: number;
raw?: { readonly bufferedAmount?: number };
};

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -707,6 +705,11 @@ function sendBytes(socket: TerminalSocket, bytes: Uint8Array) {
socket.send(asArrayBufferBytes(bytes));
}

function socketBufferedAmount(socket: TerminalSocket): number {
const amount = socket.raw?.bufferedAmount;
return typeof amount === "number" ? amount : 0;
}
Comment on lines +708 to +711

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Silent 0 fallback disables OOM protection when raw is absent

When socket.raw is undefined (any non-node-ws adapter, or if the underlying implementation doesn't expose raw), this returns 0, so socketBufferedAmount(socket) > WS_SEND_BUFFER_CAP_BYTES is always false and the drop-on-cap path never fires. The removed ACK flow-control mechanism has no fallback in that situation, so a lagging socket accumulates data without bound. In production this appears safe because node-ws always provides raw, but the failure is completely silent — there's no log, assertion, or metric to catch the missing guard if the socket type ever changes.

Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/host-service/src/terminal/terminal.ts
Line: 708-711

Comment:
**Silent 0 fallback disables OOM protection when `raw` is absent**

When `socket.raw` is `undefined` (any non-node-ws adapter, or if the underlying implementation doesn't expose `raw`), this returns `0`, so `socketBufferedAmount(socket) > WS_SEND_BUFFER_CAP_BYTES` is always `false` and the drop-on-cap path never fires. The removed ACK flow-control mechanism has no fallback in that situation, so a lagging socket accumulates data without bound. In production this appears safe because node-ws always provides `raw`, but the failure is completely silent — there's no log, assertion, or metric to catch the missing guard if the socket type ever changes.

How can I resolve this? If you propose a fix, please make it concise.


function broadcastBytes(session: TerminalSession, bytes: Uint8Array): number {
let sent = 0;
const tight = asArrayBufferBytes(bytes);
Expand All @@ -720,6 +723,19 @@ function broadcastBytes(session: TerminalSession, bytes: Uint8Array): number {
}
continue;
}
// A renderer that can't keep up lets its send buffer grow without bound.
// Drop it past the cap rather than buffer forever; it reconnects and
// replays the tail. Returning this chunk as "not sent" routes it to the
// bounded replay buffer via the caller's broadcast-or-buffer check.
if (socketBufferedAmount(socket) > WS_SEND_BUFFER_CAP_BYTES) {
session.sockets.delete(socket);
try {
socket.close(1013, "terminal output back-pressure");
} catch {
// best-effort; close may race an already-closing socket
}
continue;
}
socket.send(tight);
sent += 1;
}
Expand Down Expand Up @@ -1280,7 +1296,7 @@ export async function createTerminalSessionInternal({

session.unsubscribeDaemon = daemon.subscribe(
terminalId,
{ replay: replayOnAdoption, flowControl: true },
{ replay: replayOnAdoption },
{
onOutput(chunk) {
// Bytes flow daemon → host → xterm without UTF-8 decoding;
Expand Down Expand Up @@ -1597,11 +1613,6 @@ export function registerWorkspaceTerminalRoute({
const session = sessions.get(terminalId ?? "");
if (!session || !session.sockets.has(ws)) return;

if (message.type === "output-ack") {
session.pty.ackOutput(message.bytes);
return;
}

if (message.type === "dispose") {
disposeSession(terminalId ?? "", db);
return;
Expand Down
2 changes: 1 addition & 1 deletion packages/pty-daemon/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"build:daemon": "bun run build.ts",
"typecheck": "tsc --noEmit --emitDeclarationOnly false",
"test": "bun test src/protocol src/SessionStore src/handlers src/Pty/Pty.test.ts test/no-encoding-hops.test.ts",
"test:integration": "node --experimental-strip-types --test test/integration.test.ts test/control-plane.test.ts test/signal-recovery.test.ts test/byte-fidelity.test.ts test/handoff.test.ts test/flow-control.test.ts"
"test:integration": "node --experimental-strip-types --test test/integration.test.ts test/control-plane.test.ts test/signal-recovery.test.ts test/byte-fidelity.test.ts test/handoff.test.ts"
},
"dependencies": {
"node-pty": "1.1.0"
Expand Down
23 changes: 0 additions & 23 deletions packages/pty-daemon/src/Pty/Pty.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,6 @@ export interface Pty {
readonly pid: number;
readonly meta: SessionMeta;
write(data: Buffer): void;
/**
* Stop reading from the PTY master fd. The shell (slave side) eventually
* blocks on `write` once the kernel pipe buffer fills — real upstream
* back-pressure, not a software flag.
*/
pause(): void;
resume(): void;
resize(cols: number, rows: number): void;
kill(signal?: NodeJS.Signals): void;
onData(cb: PtyOnData): void;
Expand Down Expand Up @@ -91,14 +84,6 @@ class NodePtyAdapter implements Pty {
this.term.write(data as unknown as string);
}

pause(): void {
this.term.pause();
}

resume(): void {
this.term.resume();
}

resize(cols: number, rows: number): void {
validateDims(cols, rows);
this.term.resize(cols, rows);
Expand Down Expand Up @@ -268,14 +253,6 @@ class AdoptedPty implements Pty {
return this.fd;
}

pause(): void {
this.reader.pause();
}

resume(): void {
this.reader.resume();
}

write(data: Buffer): void {
if (this.exitFired) {
throw new Error(`session exited: ${this.pid}`);
Expand Down
Loading
Loading