Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion apps/desktop/src/renderer/lib/terminal/terminal-ws-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,14 @@ function attachSocketListeners(
// channel; renderer treats them identically). Pipe straight into
// xterm without any decoding step.
if (event.data instanceof ArrayBuffer) {
terminal.write(new Uint8Array(event.data));
// xterm.write's callback only fires once the parser has consumed
// these bytes into its buffer. That's the honest "consumed" signal
// for byte-level back-pressure — ACKing on WS receipt would lie
// about xterm being the bottleneck.
const bytes = event.data.byteLength;
terminal.write(new Uint8Array(event.data), () => {
sendOutputAck(transport, socket, bytes);
});
transport._hasReceivedBytes = true;
return;
}
Expand Down Expand Up @@ -384,6 +391,17 @@ function attachSocketListeners(
});
}

function sendOutputAck(
transport: TerminalTransport,
socket: WebSocket,
bytes: number,
) {
if (transport.socket !== socket) return;
if (socket.readyState !== WebSocket.OPEN) return;
if (transport.connectionState !== "open") return;
socket.send(JSON.stringify({ type: "output-ack", bytes }));
}

export function disconnect(transport: TerminalTransport) {
cancelReconnect(transport);
if (transport.socket) {
Expand Down
23 changes: 21 additions & 2 deletions packages/host-service/src/terminal/DaemonClient/DaemonClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,15 +186,29 @@ export class DaemonClient {
this.send({ type: "resize", id, cols, rows });
}

/**
* Fire-and-forget back-pressure ack. Tells the daemon that `bytes` of
* output for this session have been consumed by the renderer. Only
* meaningful when the subscription was opened with `flowControl: true`;
* otherwise the daemon has no counter to credit and ignores the message.
*/
ackOutput(id: string, bytes: number): void {
if (!Number.isFinite(bytes) || bytes <= 0) return;
this.send({ type: "ack-output", id, bytes: Math.floor(bytes) });
}

/**
* Subscribe to a session's output + exit stream. Returns an unsubscribe
* function. With `replay: true` the daemon sends its current ring buffer
* before live streaming begins. Multiple subscribers per session are
* supported — the daemon fans output out to all of them.
*
* `flowControl: true` opts this subscription into byte-level ACK back-
* pressure (caller must invoke ackOutput as bytes are consumed).
*/
subscribe(
id: string,
opts: { replay: boolean },
opts: { replay: boolean; flowControl?: boolean },
cb: SubscribeCallbacks,
): () => void {
let entry = this.callbacks.get(id);
Expand All @@ -218,7 +232,12 @@ export class DaemonClient {
);
}
if (wasFirst) {
this.send({ type: "subscribe", id, replay: opts.replay });
this.send({
type: "subscribe",
id,
replay: opts.replay,
flowControl: opts.flowControl === true,
});
}
return () => {
const e = this.callbacks.get(id);
Expand Down
20 changes: 19 additions & 1 deletion packages/host-service/src/terminal/terminal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ interface DaemonPty {
*/
writeBytes(bytes: Uint8Array): void;
resize(cols: number, rows: number): void;
/**
* Forward the renderer's "I consumed N bytes" ack to the daemon's flow-
* control counter for the primary subscription on this session.
*/
ackOutput(bytes: number): void;
kill(signal?: NodeJS.Signals): Promise<void>;
onData(cb: (data: string) => void): PtyDataDisposer;
onExit(
Expand Down Expand Up @@ -96,6 +101,13 @@ function makeDaemonPty(
// Daemon may have disconnected; surface via the next op.
}
},
ackOutput(bytes) {
try {
daemon.ackOutput(sessionId, bytes);
} catch {
// Daemon may have disconnected; reconnect path handles recovery.
}
},
kill(signal) {
return daemon.close(sessionId, toDaemonSignal(signal));
},
Expand Down Expand Up @@ -160,6 +172,7 @@ function getHostAgentHookUrl(): string {
type TerminalClientMessage =
| { type: "input"; data: string }
| { type: "resize"; cols: number; rows: number }
| { type: "output-ack"; bytes: number }
| { type: "dispose" };

// PTY output bytes travel as binary WebSocket frames — the renderer pipes
Expand Down Expand Up @@ -1274,7 +1287,7 @@ export async function createTerminalSessionInternal({

session.unsubscribeDaemon = daemon.subscribe(
terminalId,
{ replay: replayOnAdoption },
{ replay: replayOnAdoption, flowControl: true },
{
onOutput(chunk) {
// Bytes flow daemon → host → xterm without UTF-8 decoding;
Expand Down Expand Up @@ -1591,6 +1604,11 @@ export function registerWorkspaceTerminalRoute({
const session = sessions.get(terminalId ?? "");
if (!session || !session.sockets.has(ws)) return;

if (message.type === "output-ack") {
session.pty.ackOutput(message.bytes);
return;
}

if (message.type === "dispose") {
disposeSession(terminalId ?? "", db);
return;
Expand Down
2 changes: 1 addition & 1 deletion packages/pty-daemon/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"build:daemon": "bun run build.ts",
"typecheck": "tsc --noEmit --emitDeclarationOnly false",
"test": "bun test src/protocol src/SessionStore src/handlers src/Pty/Pty.test.ts test/no-encoding-hops.test.ts",
"test:integration": "node --experimental-strip-types --test test/integration.test.ts test/control-plane.test.ts test/signal-recovery.test.ts test/byte-fidelity.test.ts test/handoff.test.ts"
"test:integration": "node --experimental-strip-types --test test/integration.test.ts test/control-plane.test.ts test/signal-recovery.test.ts test/byte-fidelity.test.ts test/handoff.test.ts test/flow-control.test.ts"
},
"dependencies": {
"node-pty": "1.1.0"
Expand Down
23 changes: 23 additions & 0 deletions packages/pty-daemon/src/Pty/Pty.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ export interface Pty {
readonly pid: number;
readonly meta: SessionMeta;
write(data: Buffer): void;
/**
* Stop reading from the PTY master fd. The shell (slave side) eventually
* blocks on `write` once the kernel pipe buffer fills — real upstream
* back-pressure, not a software flag.
*/
pause(): void;
resume(): void;
resize(cols: number, rows: number): void;
kill(signal?: NodeJS.Signals): void;
onData(cb: PtyOnData): void;
Expand Down Expand Up @@ -84,6 +91,14 @@ class NodePtyAdapter implements Pty {
this.term.write(data as unknown as string);
}

pause(): void {
this.term.pause();
}

resume(): void {
this.term.resume();
}

resize(cols: number, rows: number): void {
validateDims(cols, rows);
this.term.resize(cols, rows);
Expand Down Expand Up @@ -253,6 +268,14 @@ class AdoptedPty implements Pty {
return this.fd;
}

pause(): void {
this.reader.pause();
}

resume(): void {
this.reader.resume();
}

write(data: Buffer): void {
if (this.exitFired) {
throw new Error(`session exited: ${this.pid}`);
Expand Down
93 changes: 90 additions & 3 deletions packages/pty-daemon/src/Server/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
} from "../handlers/index.ts";
import { adoptFromFd } from "../Pty/index.ts";
import {
type AckOutputMessage,
type ClientMessage,
encodeFrame,
FrameDecoder,
Expand Down Expand Up @@ -45,6 +46,12 @@ export interface ServerOptions {

const DEFAULT_OUTBOUND_BUFFER_CAP_BYTES = 8 * 1024 * 1024;

// Watermarks for output back-pressure. Mirrors VS Code's terminal flow-control
// defaults (their tracker counts characters; we count bytes because the wire
// is byte-native). 100KB high → pause the PTY; 5KB low → resume.
const FLOW_CONTROL_HIGH_WATERMARK = 100_000;
const FLOW_CONTROL_LOW_WATERMARK = 5_000;

interface ConnState extends Conn {
socket: net.Socket;
decoder: FrameDecoder;
Expand All @@ -56,6 +63,8 @@ export class Server {
private readonly store: SessionStore;
private readonly conns = new Set<ConnState>();
private readonly opts: ServerOptions;
/** Sessions whose PTY is currently paused by flow control. */
private readonly pausedSessions = new Set<string>();

constructor(opts: ServerOptions) {
this.opts = opts;
Expand Down Expand Up @@ -317,6 +326,7 @@ export class Server {
decoder: new FrameDecoder(),
negotiated: null,
subscriptions: new Set(),
flowControlUnacked: new Map(),
send: (msg, payload) =>
writeMessage(socket, msg, payload, outboundBufferCap),
};
Expand All @@ -338,10 +348,10 @@ export class Server {
}
});
socket.on("close", () => {
this.conns.delete(conn);
this.dropConn(conn);
});
socket.on("error", () => {
this.conns.delete(conn);
this.dropConn(conn);
});
}

Expand Down Expand Up @@ -415,6 +425,11 @@ export class Server {
}
case "unsubscribe": {
handleUnsubscribe(conn, msg);
this.maybeResume(msg.id);
return;
}
case "ack-output": {
this.handleAckOutput(conn, msg);
return;
}
case "prepare-upgrade": {
Expand Down Expand Up @@ -464,8 +479,14 @@ export class Server {
this.store.appendOutput(session, chunk);
const out: ServerMessage = { type: "output", id: session.id };
for (const c of this.conns) {
if (c.subscriptions.has(session.id)) c.send(out, chunk);
if (!c.subscriptions.has(session.id)) continue;
c.send(out, chunk);
const prev = c.flowControlUnacked.get(session.id);
if (prev !== undefined) {
c.flowControlUnacked.set(session.id, prev + chunk.byteLength);
}
}
this.maybePause(session);
});
session.pty.onExit((info) => {
session.exited = true;
Expand All @@ -481,8 +502,10 @@ export class Server {
if (c.subscriptions.has(session.id)) {
c.send(ev);
c.subscriptions.delete(session.id);
c.flowControlUnacked.delete(session.id);
}
}
this.pausedSessions.delete(session.id);
// Delete the session immediately. Without this, every closed
// terminal pane left a row in the store forever — list-reply
// inflated, memory grew unbounded.
Expand All @@ -496,6 +519,70 @@ export class Server {
this.store.delete(session.id);
});
}

private handleAckOutput(conn: ConnState, msg: AckOutputMessage): void {
if (
!Number.isFinite(msg.bytes) ||
!Number.isInteger(msg.bytes) ||
msg.bytes <= 0
) {
conn.send({
type: "error",
id: msg.id,
message: `invalid ack-output bytes: ${String(msg.bytes)}`,
code: "EPROTO",
});
return;
}
const outstanding = conn.flowControlUnacked.get(msg.id);
if (outstanding === undefined) return;
const next = Math.max(0, outstanding - msg.bytes);
conn.flowControlUnacked.set(msg.id, next);
this.maybeResume(msg.id);
}

private maybePause(session: Session): void {
if (this.pausedSessions.has(session.id)) return;
for (const c of this.conns) {
const u = c.flowControlUnacked.get(session.id);
if (u !== undefined && u > FLOW_CONTROL_HIGH_WATERMARK) {
try {
session.pty.pause();
} catch {
// already exited; nothing to back-pressure
}
this.pausedSessions.add(session.id);
return;
}
}
}

private maybeResume(sessionId: string): void {
if (!this.pausedSessions.has(sessionId)) return;
for (const c of this.conns) {
const u = c.flowControlUnacked.get(sessionId);
if (u !== undefined && u > FLOW_CONTROL_LOW_WATERMARK) return;
}
const session = this.store.get(sessionId);
this.pausedSessions.delete(sessionId);
if (session && !session.exited) {
try {
session.pty.resume();
} catch {
// already exited; nothing to resume
}
}
}

private dropConn(conn: ConnState): void {
if (!this.conns.delete(conn)) return;
// A dropped conn's unacked bytes effectively become "consumed" as far
// as the back-pressure decision is concerned — clearing them lets any
// PTY that was paused on this conn's behalf resume.
const sessionIds = Array.from(conn.flowControlUnacked.keys());
conn.flowControlUnacked.clear();
for (const id of sessionIds) this.maybeResume(id);
}
}

/**
Expand Down
2 changes: 2 additions & 0 deletions packages/pty-daemon/src/SessionStore/SessionStore.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ function fakePty(meta: { cols: number; rows: number }): Pty {
rows: meta.rows,
},
write: () => {},
pause: () => {},
resume: () => {},
resize: () => {},
kill: () => {},
onData: () => {},
Expand Down
2 changes: 2 additions & 0 deletions packages/pty-daemon/src/SessionStore/snapshot.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ function fakePty(pid: number, meta: { cols: number; rows: number }): Pty {
pid,
meta: { shell: "/bin/sh", argv: [], cols: meta.cols, rows: meta.rows },
write: () => {},
pause: () => {},
resume: () => {},
resize: () => {},
kill: () => {},
onData: () => {},
Expand Down
3 changes: 3 additions & 0 deletions packages/pty-daemon/src/handlers/handlers.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ function makeFakePty(state: FakePtyState, meta: SpawnOptions["meta"]): Pty {
pid: state.pid,
meta,
write: (b) => state.written.push(b),
pause: () => {},
resume: () => {},
resize: (c, r) => {
state.cols = c;
state.rows = r;
Expand All @@ -51,6 +53,7 @@ function makeConn(): Conn & { sent: SentFrame[] } {
return {
sent,
subscriptions: new Set(),
flowControlUnacked: new Map(),
send: (m, payload) => sent.push({ message: m, payload: payload ?? null }),
};
}
Expand Down
Loading
Loading