From d98affb2e5151adfe6736efc6bf8cd889bee53d7 Mon Sep 17 00:00:00 2001 From: Kiet Ho Date: Sun, 29 Mar 2026 12:35:48 -0700 Subject: [PATCH 1/5] fix(desktop): skip writes to backpressured terminal sockets and rate-limit warnings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Combines the fixes from #2969 and #2962 into a single changeset: 1. Skip writes to backpressured sockets (#2969): When a client socket signals backpressure (write returns false), subsequent broadcastEvent calls skip that socket entirely instead of growing Node's internal write buffer without bound. The terminal emulator still processes all data so snapshot state stays consistent — the next TUI repaint after drain naturally resyncs the display. 2. Rate-limit backpressure warnings (#2962): Replace unbounded console.warn on every backpressure event with a rate-limited warnBackpressure() method. Only one warning is emitted per 5-second window; subsequent occurrences are counted and reported in the next warning (e.g. '247 similar warnings suppressed'). Under sustained high-output commands, a single pane could previously generate 400k+ identical warnings flooding the daemon log. Tests cover: writes skipped during backpressure, writes resume after drain, warning rate-limiting within the 5s window, and suppressed count reporting after the window elapses. Closes #2969 Closes #2962 Co-Authored-By: Mastra Code (anthropic/claude-opus-4-6) --- .../src/main/terminal-host/session.test.ts | 202 +++++++++++++++++- .../desktop/src/main/terminal-host/session.ts | 48 ++++- 2 files changed, 243 insertions(+), 7 deletions(-) diff --git a/apps/desktop/src/main/terminal-host/session.test.ts b/apps/desktop/src/main/terminal-host/session.test.ts index 625d472ad10..554d0d7f6e0 100644 --- a/apps/desktop/src/main/terminal-host/session.test.ts +++ b/apps/desktop/src/main/terminal-host/session.test.ts @@ -1,4 +1,4 @@ -import { beforeEach, describe, expect, it } from "bun:test"; +import { afterEach, beforeEach, describe, expect, it, mock } from "bun:test"; import type { ChildProcess } from "node:child_process"; import { EventEmitter } from "node:events"; import path from "node:path"; @@ -12,7 +12,14 @@ import "./xterm-env-polyfill"; const { Session } = await import("./session"); -class FakeStdout extends EventEmitter {} +class FakeStdout extends EventEmitter { + pause() { + return this; + } + resume() { + return this; + } +} class FakeStdin extends EventEmitter { readonly writes: Buffer[] = []; @@ -219,3 +226,194 @@ describe("Terminal Host Session shell args", () => { expect(writes.some((message) => message.includes('"hello"'))).toBe(true); }); }); + +// ============================================================================= +// Backpressure tests (#2968 + #2961) +// ============================================================================= + +describe("Terminal Host Session backpressure", () => { + let warnSpy: ReturnType; + + beforeEach(() => { + warnSpy = mock(); + console.warn = warnSpy; + }); + + afterEach(() => { + warnSpy.mockRestore?.(); + }); + + class FakeSocket extends EventEmitter { + readonly writes: string[] = []; + remoteAddress = "127.0.0.1"; + remotePort = 9999; + private writeFn: (message: string) => boolean; + + constructor(writeFn: (message: string, writes: string[]) => boolean) { + super(); + this.writeFn = (msg) => writeFn(msg, this.writes); + } + + write(message: string): boolean { + this.writes.push(message); + return this.writeFn(message); + } + + destroy() {} + } + + function createSessionWithSocket( + writeFn: (message: string, writes: string[]) => boolean, + ) { + const child = new FakeChildProcess(); + const session = new Session({ + sessionId: "session-bp", + workspaceId: "workspace-1", + paneId: "pane-1", + tabId: "tab-1", + cols: 80, + rows: 24, + cwd: "/tmp", + shell: "/bin/bash", + spawnProcess: () => child as unknown as ChildProcess, + }); + + session.spawn({ + cwd: "/tmp", + cols: 80, + rows: 24, + env: { PATH: "/usr/bin" }, + }); + + // Emit Ready so we can attach + child.stdout.emit("data", createFrameHeader(PtySubprocessIpcType.Ready, 0)); + + const fakeSocket = new FakeSocket( + writeFn, + ) as unknown as import("node:net").Socket; + + // Directly inject the socket as an attached client + ( + session as unknown as { + attachedClients: Map< + import("node:net").Socket, + { socket: import("node:net").Socket } + >; + } + ).attachedClients.set(fakeSocket, { socket: fakeSocket }); + + const broadcast = (data: string) => { + ( + session as unknown as { + broadcastEvent: ( + eventType: string, + payload: { type: "data"; data: string }, + ) => void; + } + ).broadcastEvent("data", { type: "data", data }); + }; + + return { session, socket: fakeSocket, broadcast }; + } + + it("stops writing to a backpressured socket instead of growing the buffer", () => { + // First write succeeds, subsequent ones signal backpressure + const { socket, broadcast } = createSessionWithSocket( + (_msg, writes) => writes.length <= 1, + ); + const fakeSocket = socket as unknown as FakeSocket; + + // First broadcast: write succeeds + broadcast("frame-1"); + expect(fakeSocket.writes).toHaveLength(1); + + // Second broadcast: write returns false → socket becomes backpressured + broadcast("frame-2"); + expect(fakeSocket.writes).toHaveLength(2); + + // Subsequent broadcasts should be SKIPPED — not written to the socket + broadcast("frame-3"); + broadcast("frame-4"); + broadcast("frame-5"); + expect(fakeSocket.writes).toHaveLength(2); + }); + + it("resumes writing after the socket drains", () => { + // First write succeeds, second backpressures, after drain writes succeed again + const { socket, broadcast } = createSessionWithSocket( + (_msg, writes) => writes.length !== 2, + ); + const fakeSocket = socket as unknown as FakeSocket; + + broadcast("frame-1"); // write #1 → succeeds (length 1 !== 2) + broadcast("frame-2"); // write #2 → returns false (length 2 === 2) + + // Skipped during backpressure + broadcast("frame-3"); + broadcast("frame-4"); + expect(fakeSocket.writes).toHaveLength(2); + + // Simulate drain — triggers the once("drain") handler which removes + // the socket from clientSocketsWaitingForDrain + fakeSocket.emit("drain"); + + // After drain, new broadcasts write again (write #3 → succeeds) + broadcast("frame-5"); + expect(fakeSocket.writes).toHaveLength(3); + expect(fakeSocket.writes[2]).toContain("frame-5"); + }); + + it("emits only one backpressure warning within the rate-limit window", () => { + const { broadcast } = createSessionWithSocket(() => false); + + for (let i = 0; i < 1000; i++) { + broadcast(`chunk-${i}`); + } + + const backpressureWarns = (warnSpy.mock.calls as unknown[][]).filter( + (call) => + typeof call[0] === "string" && + call[0].includes("Client socket buffer full"), + ); + + // Should emit exactly 1 warning, not 1000 + expect(backpressureWarns.length).toBe(1); + }); + + it("includes suppressed count when warning resumes after interval", () => { + // Write always returns false so every non-skipped write triggers backpressure. + // We need to drain between writes to avoid the skip-backpressured-socket optimization. + const { session, socket, broadcast } = createSessionWithSocket(() => false); + const fakeSocket = socket as unknown as FakeSocket; + + // First broadcast: writes, gets backpressured, warns immediately + broadcast("first"); + + // Drain + re-broadcast many times within the rate-limit window to + // accumulate suppressed warnings + for (let i = 0; i < 50; i++) { + fakeSocket.emit("drain"); + broadcast(`suppressed-${i}`); + } + + // Advance past the rate-limit window + ( + session as unknown as { backpressureWarnLastAt: number } + ).backpressureWarnLastAt = Date.now() - 10_000; + + // Drain once more, then broadcast — should emit with suppressed count + fakeSocket.emit("drain"); + broadcast("after-interval"); + + const backpressureWarns = (warnSpy.mock.calls as unknown[][]).filter( + (call) => + typeof call[0] === "string" && + call[0].includes("Client socket buffer full"), + ); + + expect(backpressureWarns.length).toBe(2); + const lastWarn = backpressureWarns[1]?.[0] as string; + expect(lastWarn).toContain("suppressed"); + expect(lastWarn).toContain("50"); + }); +}); diff --git a/apps/desktop/src/main/terminal-host/session.ts b/apps/desktop/src/main/terminal-host/session.ts index c7da037418e..3e028d0ecae 100644 --- a/apps/desktop/src/main/terminal-host/session.ts +++ b/apps/desktop/src/main/terminal-host/session.ts @@ -54,6 +54,12 @@ const ATTACH_FLUSH_TIMEOUT_MS = 500; */ const MAX_SUBPROCESS_STDIN_QUEUE_BYTES = 2_000_000; +/** + * Minimum interval (ms) between backpressure warnings per session. + * Prevents log flooding when a high-output pane keeps the socket buffer full. + */ +const BACKPRESSURE_WARN_INTERVAL_MS = 5_000; + /** * How long to wait for the shell-ready marker before unblocking writes. * 15s covers heavy setups like Nix-based devenv via direnv. On timeout, @@ -127,6 +133,8 @@ export class Session { private attachedClients: Map = new Map(); private clientSocketsWaitingForDrain: Set = new Set(); private subprocessStdoutPaused = false; + private backpressureWarnLastAt = 0; + private backpressureWarnSuppressed = 0; private lastAttachedAt: Date; private exitCode: number | null = null; private disposed = false; @@ -1065,13 +1073,18 @@ export class Session { for (const { socket } of this.attachedClients.values()) { try { + // Skip sockets that are already backpressured. Continuing to write + // would grow Node's internal buffer without bound, and the massive + // flush on drain causes a visible freeze / catch-up stall (#2968). + // The emulator still processes all data so snapshot state stays + // consistent — the next TUI repaint after drain resyncs the display. + if (this.clientSocketsWaitingForDrain.has(socket)) { + continue; + } + const canWrite = socket.write(message); if (!canWrite) { - // Socket buffer full - data will be queued but may cause memory pressure - // In production, could track this and pause PTY output temporarily - console.warn( - `[Session ${this.sessionId}] Client socket buffer full, output may be delayed`, - ); + this.warnBackpressure(); this.handleClientBackpressure(socket); } } catch { @@ -1108,6 +1121,31 @@ export class Session { this.subprocess.stdout.resume(); } + /** + * Rate-limited backpressure warning to prevent log flooding. + * Emits at most one warning per BACKPRESSURE_WARN_INTERVAL_MS, + * reporting how many warnings were suppressed since the last emission. + */ + private warnBackpressure(): void { + const now = Date.now(); + if (now - this.backpressureWarnLastAt < BACKPRESSURE_WARN_INTERVAL_MS) { + this.backpressureWarnSuppressed++; + return; + } + + const suppressed = this.backpressureWarnSuppressed; + this.backpressureWarnSuppressed = 0; + this.backpressureWarnLastAt = now; + + const suffix = + suppressed > 0 + ? ` (${suppressed} similar warning${suppressed === 1 ? "" : "s"} suppressed)` + : ""; + console.warn( + `[Session ${this.sessionId}] Client socket buffer full, output may be delayed${suffix}`, + ); + } + /** * Get default shell for the platform */ From 69ed33a174523559bef540e2a8d97521ed86b71e Mon Sep 17 00:00:00 2001 From: Kiet Ho Date: Sun, 29 Mar 2026 17:13:58 -0700 Subject: [PATCH 2/5] fix(desktop): preserve terminal lifecycle events under backpressure --- .../src/main/terminal-host/session.test.ts | 58 ++++++++++++++++--- .../desktop/src/main/terminal-host/session.ts | 17 ++++-- 2 files changed, 61 insertions(+), 14 deletions(-) diff --git a/apps/desktop/src/main/terminal-host/session.test.ts b/apps/desktop/src/main/terminal-host/session.test.ts index 554d0d7f6e0..9142d23b4da 100644 --- a/apps/desktop/src/main/terminal-host/session.test.ts +++ b/apps/desktop/src/main/terminal-host/session.test.ts @@ -233,14 +233,16 @@ describe("Terminal Host Session shell args", () => { describe("Terminal Host Session backpressure", () => { let warnSpy: ReturnType; + let originalWarn: typeof console.warn; beforeEach(() => { + originalWarn = console.warn; warnSpy = mock(); console.warn = warnSpy; }); afterEach(() => { - warnSpy.mockRestore?.(); + console.warn = originalWarn; }); class FakeSocket extends EventEmitter { @@ -265,6 +267,11 @@ describe("Terminal Host Session backpressure", () => { function createSessionWithSocket( writeFn: (message: string, writes: string[]) => boolean, ) { + type BroadcastPayload = + | { type: "data"; data: string } + | { type: "error"; error: string; code?: string } + | { type: "exit"; exitCode: number; signal?: number }; + const child = new FakeChildProcess(); const session = new Session({ sessionId: "session-bp", @@ -297,23 +304,38 @@ describe("Terminal Host Session backpressure", () => { session as unknown as { attachedClients: Map< import("node:net").Socket, - { socket: import("node:net").Socket } + { + socket: import("node:net").Socket; + attachedAt: number; + attachToken: symbol; + } >; } - ).attachedClients.set(fakeSocket, { socket: fakeSocket }); + ).attachedClients.set(fakeSocket, { + socket: fakeSocket, + attachedAt: Date.now(), + attachToken: Symbol("test-attach"), + }); - const broadcast = (data: string) => { + const broadcastEvent = ( + eventType: BroadcastPayload["type"], + payload: BroadcastPayload, + ) => { ( session as unknown as { broadcastEvent: ( eventType: string, - payload: { type: "data"; data: string }, + payload: BroadcastPayload, ) => void; } - ).broadcastEvent("data", { type: "data", data }); + ).broadcastEvent(eventType, payload); }; - return { session, socket: fakeSocket, broadcast }; + const broadcast = (data: string) => { + broadcastEvent("data", { type: "data", data }); + }; + + return { session, socket: fakeSocket, broadcast, broadcastEvent }; } it("stops writing to a backpressured socket instead of growing the buffer", () => { @@ -363,7 +385,27 @@ describe("Terminal Host Session backpressure", () => { expect(fakeSocket.writes[2]).toContain("frame-5"); }); - it("emits only one backpressure warning within the rate-limit window", () => { + it("still delivers exit and error events while socket is waiting for drain", () => { + const { socket, broadcast, broadcastEvent } = createSessionWithSocket( + (_msg, writes) => writes.length <= 1, + ); + const fakeSocket = socket as unknown as FakeSocket; + + broadcast("frame-1"); + broadcast("frame-2"); + expect(fakeSocket.writes).toHaveLength(2); + + broadcastEvent("exit", { type: "exit", exitCode: 0 }); + broadcastEvent("error", { type: "error", error: "boom" }); + + expect(fakeSocket.writes).toHaveLength(4); + expect(fakeSocket.writes[2]).toContain('"event":"exit"'); + expect(fakeSocket.writes[2]).toContain('"exitCode":0'); + expect(fakeSocket.writes[3]).toContain('"event":"error"'); + expect(fakeSocket.writes[3]).toContain('"error":"boom"'); + }); + + it("emits only one backpressure warning while socket remains backpressured", () => { const { broadcast } = createSessionWithSocket(() => false); for (let i = 0; i < 1000; i++) { diff --git a/apps/desktop/src/main/terminal-host/session.ts b/apps/desktop/src/main/terminal-host/session.ts index 3e028d0ecae..0619bf74f6b 100644 --- a/apps/desktop/src/main/terminal-host/session.ts +++ b/apps/desktop/src/main/terminal-host/session.ts @@ -1000,6 +1000,8 @@ export class Session { this.subprocessStdinQueuedBytes = 0; this.subprocessStdinDrainArmed = false; this.subprocessStdoutPaused = false; + this.backpressureWarnLastAt = 0; + this.backpressureWarnSuppressed = 0; this.emulatorWriteQueue = []; this.emulatorWriteQueuedBytes = 0; @@ -1073,12 +1075,15 @@ export class Session { for (const { socket } of this.attachedClients.values()) { try { - // Skip sockets that are already backpressured. Continuing to write - // would grow Node's internal buffer without bound, and the massive - // flush on drain causes a visible freeze / catch-up stall (#2968). - // The emulator still processes all data so snapshot state stays - // consistent — the next TUI repaint after drain resyncs the display. - if (this.clientSocketsWaitingForDrain.has(socket)) { + // Skip terminal data while a client is backpressured. Continuing + // to queue screen updates would grow Node's internal buffer without + // bound, and the massive flush on drain causes a visible freeze / + // catch-up stall (#2968). Lifecycle events still need to be + // delivered even if the client is temporarily behind. + if ( + eventType === "data" && + this.clientSocketsWaitingForDrain.has(socket) + ) { continue; } From bfeaa392ebed84fd8ef8e01695ac0a79cf9fd56e Mon Sep 17 00:00:00 2001 From: Kiet Ho Date: Sun, 29 Mar 2026 20:08:38 -0700 Subject: [PATCH 3/5] fix(desktop): preserve terminal output under backpressure --- .../src/main/terminal-host/session.test.ts | 62 ++++++++++++------- .../desktop/src/main/terminal-host/session.ts | 12 ---- 2 files changed, 39 insertions(+), 35 deletions(-) diff --git a/apps/desktop/src/main/terminal-host/session.test.ts b/apps/desktop/src/main/terminal-host/session.test.ts index 9142d23b4da..d0c92d74a06 100644 --- a/apps/desktop/src/main/terminal-host/session.test.ts +++ b/apps/desktop/src/main/terminal-host/session.test.ts @@ -338,9 +338,9 @@ describe("Terminal Host Session backpressure", () => { return { session, socket: fakeSocket, broadcast, broadcastEvent }; } - it("stops writing to a backpressured socket instead of growing the buffer", () => { + it("marks the socket backpressured after a write returns false", () => { // First write succeeds, subsequent ones signal backpressure - const { socket, broadcast } = createSessionWithSocket( + const { session, socket, broadcast } = createSessionWithSocket( (_msg, writes) => writes.length <= 1, ); const fakeSocket = socket as unknown as FakeSocket; @@ -353,39 +353,53 @@ describe("Terminal Host Session backpressure", () => { broadcast("frame-2"); expect(fakeSocket.writes).toHaveLength(2); - // Subsequent broadcasts should be SKIPPED — not written to the socket - broadcast("frame-3"); - broadcast("frame-4"); - broadcast("frame-5"); - expect(fakeSocket.writes).toHaveLength(2); + expect( + ( + session as unknown as { + clientSocketsWaitingForDrain: Set; + } + ).clientSocketsWaitingForDrain.has(socket), + ).toBe(true); }); it("resumes writing after the socket drains", () => { - // First write succeeds, second backpressures, after drain writes succeed again - const { socket, broadcast } = createSessionWithSocket( + // First write succeeds, second backpressures, after drain the session + // clears the waiting state so the stdout pause can be lifted. + const { session, socket, broadcast } = createSessionWithSocket( (_msg, writes) => writes.length !== 2, ); const fakeSocket = socket as unknown as FakeSocket; broadcast("frame-1"); // write #1 → succeeds (length 1 !== 2) broadcast("frame-2"); // write #2 → returns false (length 2 === 2) - - // Skipped during backpressure - broadcast("frame-3"); - broadcast("frame-4"); expect(fakeSocket.writes).toHaveLength(2); + expect( + ( + session as unknown as { + clientSocketsWaitingForDrain: Set; + } + ).clientSocketsWaitingForDrain.has(socket), + ).toBe(true); // Simulate drain — triggers the once("drain") handler which removes // the socket from clientSocketsWaitingForDrain fakeSocket.emit("drain"); - // After drain, new broadcasts write again (write #3 → succeeds) + expect( + ( + session as unknown as { + clientSocketsWaitingForDrain: Set; + } + ).clientSocketsWaitingForDrain.has(socket), + ).toBe(false); + + // After drain, new broadcasts write again broadcast("frame-5"); expect(fakeSocket.writes).toHaveLength(3); expect(fakeSocket.writes[2]).toContain("frame-5"); }); - it("still delivers exit and error events while socket is waiting for drain", () => { + it("continues delivering data, exit, and error events while waiting for drain", () => { const { socket, broadcast, broadcastEvent } = createSessionWithSocket( (_msg, writes) => writes.length <= 1, ); @@ -393,16 +407,18 @@ describe("Terminal Host Session backpressure", () => { broadcast("frame-1"); broadcast("frame-2"); - expect(fakeSocket.writes).toHaveLength(2); + broadcast("frame-3"); + expect(fakeSocket.writes).toHaveLength(3); + expect(fakeSocket.writes[2]).toContain("frame-3"); broadcastEvent("exit", { type: "exit", exitCode: 0 }); broadcastEvent("error", { type: "error", error: "boom" }); - expect(fakeSocket.writes).toHaveLength(4); - expect(fakeSocket.writes[2]).toContain('"event":"exit"'); - expect(fakeSocket.writes[2]).toContain('"exitCode":0'); - expect(fakeSocket.writes[3]).toContain('"event":"error"'); - expect(fakeSocket.writes[3]).toContain('"error":"boom"'); + expect(fakeSocket.writes).toHaveLength(5); + expect(fakeSocket.writes[3]).toContain('"event":"exit"'); + expect(fakeSocket.writes[3]).toContain('"exitCode":0'); + expect(fakeSocket.writes[4]).toContain('"event":"error"'); + expect(fakeSocket.writes[4]).toContain('"error":"boom"'); }); it("emits only one backpressure warning while socket remains backpressured", () => { @@ -423,8 +439,8 @@ describe("Terminal Host Session backpressure", () => { }); it("includes suppressed count when warning resumes after interval", () => { - // Write always returns false so every non-skipped write triggers backpressure. - // We need to drain between writes to avoid the skip-backpressured-socket optimization. + // Write always returns false so every broadcast triggers backpressure. + // Drain between writes to simulate a client that repeatedly falls behind. const { session, socket, broadcast } = createSessionWithSocket(() => false); const fakeSocket = socket as unknown as FakeSocket; diff --git a/apps/desktop/src/main/terminal-host/session.ts b/apps/desktop/src/main/terminal-host/session.ts index 0619bf74f6b..5c5c17a5dba 100644 --- a/apps/desktop/src/main/terminal-host/session.ts +++ b/apps/desktop/src/main/terminal-host/session.ts @@ -1075,18 +1075,6 @@ export class Session { for (const { socket } of this.attachedClients.values()) { try { - // Skip terminal data while a client is backpressured. Continuing - // to queue screen updates would grow Node's internal buffer without - // bound, and the massive flush on drain causes a visible freeze / - // catch-up stall (#2968). Lifecycle events still need to be - // delivered even if the client is temporarily behind. - if ( - eventType === "data" && - this.clientSocketsWaitingForDrain.has(socket) - ) { - continue; - } - const canWrite = socket.write(message); if (!canWrite) { this.warnBackpressure(); From 84e3963154680627f666814b5c2492a0a5d7d439 Mon Sep 17 00:00:00 2001 From: Kiet Ho Date: Sun, 29 Mar 2026 20:14:13 -0700 Subject: [PATCH 4/5] test(desktop): trim terminal host session coverage --- .../src/main/terminal-host/session.test.ts | 260 +----------------- 1 file changed, 2 insertions(+), 258 deletions(-) diff --git a/apps/desktop/src/main/terminal-host/session.test.ts b/apps/desktop/src/main/terminal-host/session.test.ts index d0c92d74a06..625d472ad10 100644 --- a/apps/desktop/src/main/terminal-host/session.test.ts +++ b/apps/desktop/src/main/terminal-host/session.test.ts @@ -1,4 +1,4 @@ -import { afterEach, beforeEach, describe, expect, it, mock } from "bun:test"; +import { beforeEach, describe, expect, it } from "bun:test"; import type { ChildProcess } from "node:child_process"; import { EventEmitter } from "node:events"; import path from "node:path"; @@ -12,14 +12,7 @@ import "./xterm-env-polyfill"; const { Session } = await import("./session"); -class FakeStdout extends EventEmitter { - pause() { - return this; - } - resume() { - return this; - } -} +class FakeStdout extends EventEmitter {} class FakeStdin extends EventEmitter { readonly writes: Buffer[] = []; @@ -226,252 +219,3 @@ describe("Terminal Host Session shell args", () => { expect(writes.some((message) => message.includes('"hello"'))).toBe(true); }); }); - -// ============================================================================= -// Backpressure tests (#2968 + #2961) -// ============================================================================= - -describe("Terminal Host Session backpressure", () => { - let warnSpy: ReturnType; - let originalWarn: typeof console.warn; - - beforeEach(() => { - originalWarn = console.warn; - warnSpy = mock(); - console.warn = warnSpy; - }); - - afterEach(() => { - console.warn = originalWarn; - }); - - class FakeSocket extends EventEmitter { - readonly writes: string[] = []; - remoteAddress = "127.0.0.1"; - remotePort = 9999; - private writeFn: (message: string) => boolean; - - constructor(writeFn: (message: string, writes: string[]) => boolean) { - super(); - this.writeFn = (msg) => writeFn(msg, this.writes); - } - - write(message: string): boolean { - this.writes.push(message); - return this.writeFn(message); - } - - destroy() {} - } - - function createSessionWithSocket( - writeFn: (message: string, writes: string[]) => boolean, - ) { - type BroadcastPayload = - | { type: "data"; data: string } - | { type: "error"; error: string; code?: string } - | { type: "exit"; exitCode: number; signal?: number }; - - const child = new FakeChildProcess(); - const session = new Session({ - sessionId: "session-bp", - workspaceId: "workspace-1", - paneId: "pane-1", - tabId: "tab-1", - cols: 80, - rows: 24, - cwd: "/tmp", - shell: "/bin/bash", - spawnProcess: () => child as unknown as ChildProcess, - }); - - session.spawn({ - cwd: "/tmp", - cols: 80, - rows: 24, - env: { PATH: "/usr/bin" }, - }); - - // Emit Ready so we can attach - child.stdout.emit("data", createFrameHeader(PtySubprocessIpcType.Ready, 0)); - - const fakeSocket = new FakeSocket( - writeFn, - ) as unknown as import("node:net").Socket; - - // Directly inject the socket as an attached client - ( - session as unknown as { - attachedClients: Map< - import("node:net").Socket, - { - socket: import("node:net").Socket; - attachedAt: number; - attachToken: symbol; - } - >; - } - ).attachedClients.set(fakeSocket, { - socket: fakeSocket, - attachedAt: Date.now(), - attachToken: Symbol("test-attach"), - }); - - const broadcastEvent = ( - eventType: BroadcastPayload["type"], - payload: BroadcastPayload, - ) => { - ( - session as unknown as { - broadcastEvent: ( - eventType: string, - payload: BroadcastPayload, - ) => void; - } - ).broadcastEvent(eventType, payload); - }; - - const broadcast = (data: string) => { - broadcastEvent("data", { type: "data", data }); - }; - - return { session, socket: fakeSocket, broadcast, broadcastEvent }; - } - - it("marks the socket backpressured after a write returns false", () => { - // First write succeeds, subsequent ones signal backpressure - const { session, socket, broadcast } = createSessionWithSocket( - (_msg, writes) => writes.length <= 1, - ); - const fakeSocket = socket as unknown as FakeSocket; - - // First broadcast: write succeeds - broadcast("frame-1"); - expect(fakeSocket.writes).toHaveLength(1); - - // Second broadcast: write returns false → socket becomes backpressured - broadcast("frame-2"); - expect(fakeSocket.writes).toHaveLength(2); - - expect( - ( - session as unknown as { - clientSocketsWaitingForDrain: Set; - } - ).clientSocketsWaitingForDrain.has(socket), - ).toBe(true); - }); - - it("resumes writing after the socket drains", () => { - // First write succeeds, second backpressures, after drain the session - // clears the waiting state so the stdout pause can be lifted. - const { session, socket, broadcast } = createSessionWithSocket( - (_msg, writes) => writes.length !== 2, - ); - const fakeSocket = socket as unknown as FakeSocket; - - broadcast("frame-1"); // write #1 → succeeds (length 1 !== 2) - broadcast("frame-2"); // write #2 → returns false (length 2 === 2) - expect(fakeSocket.writes).toHaveLength(2); - expect( - ( - session as unknown as { - clientSocketsWaitingForDrain: Set; - } - ).clientSocketsWaitingForDrain.has(socket), - ).toBe(true); - - // Simulate drain — triggers the once("drain") handler which removes - // the socket from clientSocketsWaitingForDrain - fakeSocket.emit("drain"); - - expect( - ( - session as unknown as { - clientSocketsWaitingForDrain: Set; - } - ).clientSocketsWaitingForDrain.has(socket), - ).toBe(false); - - // After drain, new broadcasts write again - broadcast("frame-5"); - expect(fakeSocket.writes).toHaveLength(3); - expect(fakeSocket.writes[2]).toContain("frame-5"); - }); - - it("continues delivering data, exit, and error events while waiting for drain", () => { - const { socket, broadcast, broadcastEvent } = createSessionWithSocket( - (_msg, writes) => writes.length <= 1, - ); - const fakeSocket = socket as unknown as FakeSocket; - - broadcast("frame-1"); - broadcast("frame-2"); - broadcast("frame-3"); - expect(fakeSocket.writes).toHaveLength(3); - expect(fakeSocket.writes[2]).toContain("frame-3"); - - broadcastEvent("exit", { type: "exit", exitCode: 0 }); - broadcastEvent("error", { type: "error", error: "boom" }); - - expect(fakeSocket.writes).toHaveLength(5); - expect(fakeSocket.writes[3]).toContain('"event":"exit"'); - expect(fakeSocket.writes[3]).toContain('"exitCode":0'); - expect(fakeSocket.writes[4]).toContain('"event":"error"'); - expect(fakeSocket.writes[4]).toContain('"error":"boom"'); - }); - - it("emits only one backpressure warning while socket remains backpressured", () => { - const { broadcast } = createSessionWithSocket(() => false); - - for (let i = 0; i < 1000; i++) { - broadcast(`chunk-${i}`); - } - - const backpressureWarns = (warnSpy.mock.calls as unknown[][]).filter( - (call) => - typeof call[0] === "string" && - call[0].includes("Client socket buffer full"), - ); - - // Should emit exactly 1 warning, not 1000 - expect(backpressureWarns.length).toBe(1); - }); - - it("includes suppressed count when warning resumes after interval", () => { - // Write always returns false so every broadcast triggers backpressure. - // Drain between writes to simulate a client that repeatedly falls behind. - const { session, socket, broadcast } = createSessionWithSocket(() => false); - const fakeSocket = socket as unknown as FakeSocket; - - // First broadcast: writes, gets backpressured, warns immediately - broadcast("first"); - - // Drain + re-broadcast many times within the rate-limit window to - // accumulate suppressed warnings - for (let i = 0; i < 50; i++) { - fakeSocket.emit("drain"); - broadcast(`suppressed-${i}`); - } - - // Advance past the rate-limit window - ( - session as unknown as { backpressureWarnLastAt: number } - ).backpressureWarnLastAt = Date.now() - 10_000; - - // Drain once more, then broadcast — should emit with suppressed count - fakeSocket.emit("drain"); - broadcast("after-interval"); - - const backpressureWarns = (warnSpy.mock.calls as unknown[][]).filter( - (call) => - typeof call[0] === "string" && - call[0].includes("Client socket buffer full"), - ); - - expect(backpressureWarns.length).toBe(2); - const lastWarn = backpressureWarns[1]?.[0] as string; - expect(lastWarn).toContain("suppressed"); - expect(lastWarn).toContain("50"); - }); -}); From a4617b31a37d852c8b81166ac62f94dd3b3e4d6a Mon Sep 17 00:00:00 2001 From: Kiet Ho Date: Sun, 29 Mar 2026 20:16:36 -0700 Subject: [PATCH 5/5] fix(desktop): remove terminal backpressure warning --- .../desktop/src/main/terminal-host/session.ts | 36 ------------------- 1 file changed, 36 deletions(-) diff --git a/apps/desktop/src/main/terminal-host/session.ts b/apps/desktop/src/main/terminal-host/session.ts index 5c5c17a5dba..47402c112a8 100644 --- a/apps/desktop/src/main/terminal-host/session.ts +++ b/apps/desktop/src/main/terminal-host/session.ts @@ -54,12 +54,6 @@ const ATTACH_FLUSH_TIMEOUT_MS = 500; */ const MAX_SUBPROCESS_STDIN_QUEUE_BYTES = 2_000_000; -/** - * Minimum interval (ms) between backpressure warnings per session. - * Prevents log flooding when a high-output pane keeps the socket buffer full. - */ -const BACKPRESSURE_WARN_INTERVAL_MS = 5_000; - /** * How long to wait for the shell-ready marker before unblocking writes. * 15s covers heavy setups like Nix-based devenv via direnv. On timeout, @@ -133,8 +127,6 @@ export class Session { private attachedClients: Map = new Map(); private clientSocketsWaitingForDrain: Set = new Set(); private subprocessStdoutPaused = false; - private backpressureWarnLastAt = 0; - private backpressureWarnSuppressed = 0; private lastAttachedAt: Date; private exitCode: number | null = null; private disposed = false; @@ -1000,8 +992,6 @@ export class Session { this.subprocessStdinQueuedBytes = 0; this.subprocessStdinDrainArmed = false; this.subprocessStdoutPaused = false; - this.backpressureWarnLastAt = 0; - this.backpressureWarnSuppressed = 0; this.emulatorWriteQueue = []; this.emulatorWriteQueuedBytes = 0; @@ -1077,7 +1067,6 @@ export class Session { try { const canWrite = socket.write(message); if (!canWrite) { - this.warnBackpressure(); this.handleClientBackpressure(socket); } } catch { @@ -1114,31 +1103,6 @@ export class Session { this.subprocess.stdout.resume(); } - /** - * Rate-limited backpressure warning to prevent log flooding. - * Emits at most one warning per BACKPRESSURE_WARN_INTERVAL_MS, - * reporting how many warnings were suppressed since the last emission. - */ - private warnBackpressure(): void { - const now = Date.now(); - if (now - this.backpressureWarnLastAt < BACKPRESSURE_WARN_INTERVAL_MS) { - this.backpressureWarnSuppressed++; - return; - } - - const suppressed = this.backpressureWarnSuppressed; - this.backpressureWarnSuppressed = 0; - this.backpressureWarnLastAt = now; - - const suffix = - suppressed > 0 - ? ` (${suppressed} similar warning${suppressed === 1 ? "" : "s"} suppressed)` - : ""; - console.warn( - `[Session ${this.sessionId}] Client socket buffer full, output may be delayed${suffix}`, - ); - } - /** * Get default shell for the platform */